http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/io/gearpump/transport/netty/WrappedChannelBuffer.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/io/gearpump/transport/netty/WrappedChannelBuffer.java 
b/core/src/main/java/io/gearpump/transport/netty/WrappedChannelBuffer.java
deleted file mode 100644
index 3531f3b..0000000
--- a/core/src/main/java/io/gearpump/transport/netty/WrappedChannelBuffer.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport.netty;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-/** Wrap ChannelBuffer as a DataInput */
-public class WrappedChannelBuffer implements DataInput {
-  private ChannelBuffer channelBuffer;
-
-  public WrappedChannelBuffer() {
-  }
-
-  public WrappedChannelBuffer(ChannelBuffer channelBuffer) {
-    this.channelBuffer = channelBuffer;
-  }
-
-  public void setChannelBuffer(ChannelBuffer channelBuffer) {
-    this.channelBuffer = channelBuffer;
-  }
-
-  @Override
-  public void readFully(byte[] b) throws IOException {
-    channelBuffer.readBytes(b);
-  }
-
-  @Override
-  public void readFully(byte[] b, int off, int len) throws IOException {
-    channelBuffer.readBytes(b, off, len);
-  }
-
-  @Override
-  public int skipBytes(int n) throws IOException {
-    channelBuffer.skipBytes(n);
-    return n;
-  }
-
-  @Override
-  public boolean readBoolean() throws IOException {
-    return channelBuffer.readByte() != 0;
-  }
-
-  @Override
-  public byte readByte() throws IOException {
-    return channelBuffer.readByte();
-  }
-
-  @Override
-  public int readUnsignedByte() throws IOException {
-    return channelBuffer.readUnsignedByte();
-  }
-
-  @Override
-  public short readShort() throws IOException {
-    return channelBuffer.readShort();
-  }
-
-  @Override
-  public int readUnsignedShort() throws IOException {
-    return channelBuffer.readUnsignedShort();
-  }
-
-  @Override
-  public char readChar() throws IOException {
-    return channelBuffer.readChar();
-  }
-
-  @Override
-  public int readInt() throws IOException {
-    return channelBuffer.readInt();
-  }
-
-  @Override
-  public long readLong() throws IOException {
-    return channelBuffer.readLong();
-  }
-
-  @Override
-  public float readFloat() throws IOException {
-    return channelBuffer.readFloat();
-  }
-
-  @Override
-  public double readDouble() throws IOException {
-    return channelBuffer.readDouble();
-  }
-
-  @Override
-  public String readLine() throws IOException {
-    throw new UnsupportedOperationException("readLine is not supported");
-  }
-
-  @Override
-  public String readUTF() throws IOException {
-    throw new UnsupportedOperationException("readUTF is not supported");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/io/gearpump/util/AkkaHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/io/gearpump/util/AkkaHelper.java 
b/core/src/main/java/io/gearpump/util/AkkaHelper.java
deleted file mode 100644
index 286fabf..0000000
--- a/core/src/main/java/io/gearpump/util/AkkaHelper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-
-public class AkkaHelper {
-
-  /**
-   * Helper util to access the private[akka] system.actorFor method
-   *
-   * This is used for performance optimization, we encode the session Id
-   * in the ActorRef path. Session Id is used to identity sender Task.
-   *
-   * @param system ActorSystem
-   * @param path Relative or absolute path of this Actor System.
-   * @return Full qualified ActorRef.
-   */
-  @SuppressWarnings("deprecation")
-  public static ActorRef actorFor(ActorSystem system, String path) {
-    return system.actorFor(path);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java 
b/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java
deleted file mode 100644
index 93b65e8..0000000
--- a/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util;
-
-import org.apache.log4j.RollingFileAppender;
-
-/**
- * Log4j appender for to write to user specified Hadoop filesystem.
- */
-public class HadoopFSLogAppender extends RollingFileAppender {
-  //TODO: implement Log appender on Hadoop filesystem
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/io/gearpump/util/RecreateRollingFileAppender.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/io/gearpump/util/RecreateRollingFileAppender.java 
b/core/src/main/java/io/gearpump/util/RecreateRollingFileAppender.java
deleted file mode 100644
index acbd1cb..0000000
--- a/core/src/main/java/io/gearpump/util/RecreateRollingFileAppender.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util;
-
-import org.apache.log4j.RollingFileAppender;
-import org.apache.log4j.spi.LoggingEvent;
-
-import java.io.File;
-
-/** When log file is deleted, tried to recreate it in file system  */
-public class RecreateRollingFileAppender extends RollingFileAppender {
-
-  protected long checkFileInterval = 60L;
-  private long lastCheckTime = 0L;
-
-  @Override
-  public void append(LoggingEvent event) {
-    checkInterval();
-    super.append(event);
-  }
-
-  private void checkInterval() {
-    long currentTime = System.currentTimeMillis();
-    if ((currentTime - lastCheckTime) > (checkFileInterval * 1000)) {
-      checkLogFileExist();
-      lastCheckTime = currentTime;
-    }
-  }
-
-  private void checkLogFileExist() {
-    String fileName = super.fileName;
-    if (fileName != null) {
-      File logFile = new File(fileName);
-      if (!logFile.exists()) {
-        this.setFile(fileName);
-        this.activateOptions();
-      }
-    }
-  }
-
-  public long getCheckFileInterval() {
-    return this.checkFileInterval;
-  }
-
-  public void setCheckFileInterval(long checkFileInterval) {
-    this.checkFileInterval = checkFileInterval;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/transport/netty/ITransportMessageSerializer.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/gearpump/transport/netty/ITransportMessageSerializer.java
 
b/core/src/main/java/org/apache/gearpump/transport/netty/ITransportMessageSerializer.java
new file mode 100644
index 0000000..8d3415c
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gearpump/transport/netty/ITransportMessageSerializer.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+
+public interface ITransportMessageSerializer {
+
+  int getLength(Object obj);
+
+  void serialize(DataOutput dataOutput, Object transportMessage);
+
+  Object deserialize(DataInput dataInput, int length);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/transport/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/gearpump/transport/netty/MessageBatch.java 
b/core/src/main/java/org/apache/gearpump/transport/netty/MessageBatch.java
new file mode 100644
index 0000000..7ee85f3
--- /dev/null
+++ b/core/src/main/java/org/apache/gearpump/transport/netty/MessageBatch.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty;
+
+import io.gearpump.google.common.io.Closeables;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Netty message on the wire is wrapped as MessageBatch
+ */
+public class MessageBatch {
+  private static final Logger log = 
LoggerFactory.getLogger(MessageBatch.class);
+
+  private int buffer_size;
+  private List<TaskMessage> messages;
+  private int encoded_length;
+  private ITransportMessageSerializer serializer;
+
+  MessageBatch(int buffer_size, ITransportMessageSerializer serializer) {
+    this.buffer_size = buffer_size;
+    messages = new ArrayList<TaskMessage>();
+    encoded_length = 0;
+    this.serializer = serializer;
+  }
+
+  void add(TaskMessage taskMessage) {
+    if (taskMessage == null) {
+      throw new RuntimeException("null object forbidden in a message batch");
+    }
+
+    messages.add(taskMessage);
+    encoded_length += msgEncodeLength(taskMessage);
+  }
+
+  TaskMessage get(int index) {
+    return messages.get(index);
+  }
+
+  /**
+   * try to add a TaskMessage to a batch
+   *
+   * @param taskMsg - {@link org.apache.gearpump.transport.netty.TaskMessage}
+   * @return false if the msg could not be added due to buffer size limit; 
true otherwise
+   */
+  boolean tryAdd(TaskMessage taskMsg) {
+    if ((encoded_length + msgEncodeLength(taskMsg)) <= buffer_size) {
+      add(taskMsg);
+      return true;
+    }
+    return false;
+  }
+
+  private int msgEncodeLength(TaskMessage taskMsg) {
+    int size = 0;
+    if (taskMsg != null) {
+      size = 24; //sessionId(INT) + sourceTask(LONG) + targetTask(LONG) + 
messageLength(INT)
+      if (taskMsg.message() != null) {
+        size += serializer.getLength(taskMsg.message());
+      }
+    }
+    return size;
+  }
+
+  /**
+   * @return true, if allowed buffer is Full
+   */
+  boolean isFull() {
+    return encoded_length >= buffer_size;
+  }
+
+  /**
+   * @return true, if no messages in this batch
+   */
+  boolean isEmpty() {
+    return messages.isEmpty();
+  }
+
+  /**
+   * @return number of messages available in this batch
+   */
+  int size() {
+    return messages.size();
+  }
+
+  /**
+   * create a buffer containing the encoding of this batch
+   */
+  ChannelBuffer buffer() throws IOException {
+    ChannelBufferOutputStream bout =
+      new 
ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
+
+    try {
+      for (TaskMessage msg : messages) {
+        writeTaskMessage(bout, msg);
+      }
+      return bout.buffer();
+    } catch (IOException e) {
+      log.error("Error while writing Tasks to Channel Buffer - {}", 
e.getMessage());
+    } finally {
+      Closeables.close(bout, false);
+    }
+    return null;
+  }
+
+  /**
+   * write a TaskMessage into a stream
+   * <p>
+   * Each TaskMessage is encoded as:
+   * sessionId ... int(4)
+   * source task ... Long(8)
+   * target task ... long(8)
+   * len ... int(4)
+   * payload ... byte[]     *
+   */
+  private void writeTaskMessage(ChannelBufferOutputStream bout,
+      TaskMessage message) throws IOException {
+    long target_id = message.targetTask();
+    long source_id = message.sourceTask();
+    int sessionId = message.sessionId();
+    int msgLength = serializer.getLength(message.message());
+
+    bout.writeInt(sessionId);
+    bout.writeLong(target_id);
+    bout.writeLong(source_id);
+    bout.writeInt(msgLength);
+    serializer.serialize(bout, message.message());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/transport/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/gearpump/transport/netty/MessageDecoder.java 
b/core/src/main/java/org/apache/gearpump/transport/netty/MessageDecoder.java
new file mode 100644
index 0000000..761bb38
--- /dev/null
+++ b/core/src/main/java/org/apache/gearpump/transport/netty/MessageDecoder.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MessageDecoder extends FrameDecoder {
+  private ITransportMessageSerializer serializer;
+  private WrappedChannelBuffer dataInput = new WrappedChannelBuffer();
+
+  public MessageDecoder(ITransportMessageSerializer serializer) {
+    this.serializer = serializer;
+  }
+
+  /*
+   * Each TaskMessage is encoded as:
+   *  sessionId ... int(4)
+   *  source task ... long(8)
+   *  target task ... long(8)
+   *  len ... int(4)
+   *  payload ... byte[]     *
+   */
+  protected List<TaskMessage> decode(ChannelHandlerContext ctx, Channel 
channel,
+      ChannelBuffer buf) {
+    this.dataInput.setChannelBuffer(buf);
+
+    final int SESION_LENGTH = 4; //int
+    final int SOURCE_TASK_LENGTH = 8; //long
+    final int TARGET_TASK_LENGTH = 8; //long
+    final int MESSAGE_LENGTH = 4; //int
+    final int HEADER_LENGTH = SESION_LENGTH + SOURCE_TASK_LENGTH + 
TARGET_TASK_LENGTH + MESSAGE_LENGTH;
+
+    // Make sure that we have received at least a short message
+    long available = buf.readableBytes();
+    if (available < HEADER_LENGTH) {
+      //need more data
+      return null;
+    }
+
+    List<TaskMessage> taskMessageList = new ArrayList<TaskMessage>();
+
+    // Use while loop, try to decode as more messages as possible in single 
call
+    while (available >= HEADER_LENGTH) {
+
+      // Mark the current buffer position before reading task/len field
+      // because the whole frame might not be in the buffer yet.
+      // We will reset the buffer position to the marked position if
+      // there's not enough bytes in the buffer.
+      buf.markReaderIndex();
+
+      int sessionId = buf.readInt();
+      long targetTask = buf.readLong();
+      long sourceTask = buf.readLong();
+      // Read the length field.
+      int length = buf.readInt();
+
+      available -= HEADER_LENGTH;
+
+      if (length <= 0) {
+        taskMessageList.add(new TaskMessage(sessionId, targetTask, sourceTask, 
null));
+        break;
+      }
+
+      // Make sure if there's enough bytes in the buffer.
+      if (available < length) {
+        // The whole bytes were not received yet - return null.
+        buf.resetReaderIndex();
+        break;
+      }
+      available -= length;
+
+      // There's enough bytes in the buffer. Read it.
+      Object message = serializer.deserialize(dataInput, length);
+
+      // Successfully decoded a frame.
+      // Return a TaskMessage object
+      taskMessageList.add(new TaskMessage(sessionId, targetTask, sourceTask, 
message));
+    }
+
+    return taskMessageList.size() == 0 ? null : taskMessageList;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/transport/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/gearpump/transport/netty/MessageEncoder.java 
b/core/src/main/java/org/apache/gearpump/transport/netty/MessageEncoder.java
new file mode 100644
index 0000000..008c19d
--- /dev/null
+++ b/core/src/main/java/org/apache/gearpump/transport/netty/MessageEncoder.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+public class MessageEncoder extends OneToOneEncoder {
+  @Override
+  protected Object encode(ChannelHandlerContext ctx, Channel channel, Object 
obj) throws Exception {
+    if (obj instanceof MessageBatch) {
+      return ((MessageBatch) obj).buffer();
+    }
+
+    throw new RuntimeException("Unsupported encoding of object of class " + 
obj.getClass().getName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/transport/netty/NettyRenameThreadFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/gearpump/transport/netty/NettyRenameThreadFactory.java
 
b/core/src/main/java/org/apache/gearpump/transport/netty/NettyRenameThreadFactory.java
new file mode 100644
index 0000000..270a358
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gearpump/transport/netty/NettyRenameThreadFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty;
+
+import org.jboss.netty.util.ThreadNameDeterminer;
+import org.jboss.netty.util.ThreadRenamingRunnable;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NettyRenameThreadFactory implements ThreadFactory {
+
+  static {
+    //Rename Netty threads
+    
ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
+  }
+
+  final ThreadGroup group;
+  final AtomicInteger index = new AtomicInteger(1);
+  final String name;
+
+  NettyRenameThreadFactory(String name) {
+    SecurityManager s = System.getSecurityManager();
+    group = (s != null) ? s.getThreadGroup() :
+      Thread.currentThread().getThreadGroup();
+    this.name = name;
+  }
+
+  public Thread newThread(Runnable r) {
+    Thread t = new Thread(group, r, name + "-" + index.getAndIncrement(), 0);
+    if (t.isDaemon())
+      t.setDaemon(false);
+    if (t.getPriority() != Thread.NORM_PRIORITY)
+      t.setPriority(Thread.NORM_PRIORITY);
+    return t;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/transport/netty/TaskMessage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/gearpump/transport/netty/TaskMessage.java 
b/core/src/main/java/org/apache/gearpump/transport/netty/TaskMessage.java
new file mode 100644
index 0000000..beb5b0d
--- /dev/null
+++ b/core/src/main/java/org/apache/gearpump/transport/netty/TaskMessage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty;
+
+public class TaskMessage {
+
+  // When network partition happen, there may be several task instances of
+  // same taskId co-existing for a short period of time. When they send 
messages
+  // to same target task, it may cause confusion.
+  // With sessionId, we can know which messages are from an old session, and 
which
+  // are from new session. Messages of old sesson will be dropped.
+
+  private int _sessionId;
+  private long _targetTask;
+  private long _sourceTask;
+  private Object _message;
+
+  public TaskMessage(int sessionId, long targetTask, long sourceTask, Object 
message) {
+    _sessionId = sessionId;
+    _targetTask = targetTask;
+    _sourceTask = sourceTask;
+    _message = message;
+  }
+
+  public int sessionId() {
+    return _sessionId;
+  }
+
+  public long targetTask() {
+    return _targetTask;
+  }
+
+  public long sourceTask() {
+    return _sourceTask;
+  }
+
+  public Object message() {
+    return _message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/transport/netty/WrappedChannelBuffer.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/gearpump/transport/netty/WrappedChannelBuffer.java
 
b/core/src/main/java/org/apache/gearpump/transport/netty/WrappedChannelBuffer.java
new file mode 100644
index 0000000..878ae09
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gearpump/transport/netty/WrappedChannelBuffer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+/** Wrap ChannelBuffer as a DataInput */
+public class WrappedChannelBuffer implements DataInput {
+  private ChannelBuffer channelBuffer;
+
+  public WrappedChannelBuffer() {
+  }
+
+  public WrappedChannelBuffer(ChannelBuffer channelBuffer) {
+    this.channelBuffer = channelBuffer;
+  }
+
+  public void setChannelBuffer(ChannelBuffer channelBuffer) {
+    this.channelBuffer = channelBuffer;
+  }
+
+  @Override
+  public void readFully(byte[] b) throws IOException {
+    channelBuffer.readBytes(b);
+  }
+
+  @Override
+  public void readFully(byte[] b, int off, int len) throws IOException {
+    channelBuffer.readBytes(b, off, len);
+  }
+
+  @Override
+  public int skipBytes(int n) throws IOException {
+    channelBuffer.skipBytes(n);
+    return n;
+  }
+
+  @Override
+  public boolean readBoolean() throws IOException {
+    return channelBuffer.readByte() != 0;
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    return channelBuffer.readByte();
+  }
+
+  @Override
+  public int readUnsignedByte() throws IOException {
+    return channelBuffer.readUnsignedByte();
+  }
+
+  @Override
+  public short readShort() throws IOException {
+    return channelBuffer.readShort();
+  }
+
+  @Override
+  public int readUnsignedShort() throws IOException {
+    return channelBuffer.readUnsignedShort();
+  }
+
+  @Override
+  public char readChar() throws IOException {
+    return channelBuffer.readChar();
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    return channelBuffer.readInt();
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    return channelBuffer.readLong();
+  }
+
+  @Override
+  public float readFloat() throws IOException {
+    return channelBuffer.readFloat();
+  }
+
+  @Override
+  public double readDouble() throws IOException {
+    return channelBuffer.readDouble();
+  }
+
+  @Override
+  public String readLine() throws IOException {
+    throw new UnsupportedOperationException("readLine is not supported");
+  }
+
+  @Override
+  public String readUTF() throws IOException {
+    throw new UnsupportedOperationException("readUTF is not supported");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/util/AkkaHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/gearpump/util/AkkaHelper.java 
b/core/src/main/java/org/apache/gearpump/util/AkkaHelper.java
new file mode 100644
index 0000000..2b5a9e6
--- /dev/null
+++ b/core/src/main/java/org/apache/gearpump/util/AkkaHelper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+public class AkkaHelper {
+
+  /**
+   * Helper util to access the private[akka] system.actorFor method
+   *
+   * This is used for performance optimization, we encode the session Id
+   * in the ActorRef path. Session Id is used to identity sender Task.
+   *
+   * @param system ActorSystem
+   * @param path Relative or absolute path of this Actor System.
+   * @return Full qualified ActorRef.
+   */
+  @SuppressWarnings("deprecation")
+  public static ActorRef actorFor(ActorSystem system, String path) {
+    return system.actorFor(path);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/util/HadoopFSLogAppender.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/gearpump/util/HadoopFSLogAppender.java 
b/core/src/main/java/org/apache/gearpump/util/HadoopFSLogAppender.java
new file mode 100644
index 0000000..bf1e618
--- /dev/null
+++ b/core/src/main/java/org/apache/gearpump/util/HadoopFSLogAppender.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util;
+
+import org.apache.log4j.RollingFileAppender;
+
+/**
+ * Log4j appender for to write to user specified Hadoop filesystem.
+ */
+public class HadoopFSLogAppender extends RollingFileAppender {
+  //TODO: implement Log appender on Hadoop filesystem
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/util/RecreateRollingFileAppender.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/gearpump/util/RecreateRollingFileAppender.java 
b/core/src/main/java/org/apache/gearpump/util/RecreateRollingFileAppender.java
new file mode 100644
index 0000000..594ccf9
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gearpump/util/RecreateRollingFileAppender.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util;
+
+import org.apache.log4j.RollingFileAppender;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.io.File;
+
+/** When log file is deleted, tried to recreate it in file system  */
+public class RecreateRollingFileAppender extends RollingFileAppender {
+
+  protected long checkFileInterval = 60L;
+  private long lastCheckTime = 0L;
+
+  @Override
+  public void append(LoggingEvent event) {
+    checkInterval();
+    super.append(event);
+  }
+
+  private void checkInterval() {
+    long currentTime = System.currentTimeMillis();
+    if ((currentTime - lastCheckTime) > (checkFileInterval * 1000)) {
+      checkLogFileExist();
+      lastCheckTime = currentTime;
+    }
+  }
+
+  private void checkLogFileExist() {
+    String fileName = super.fileName;
+    if (fileName != null) {
+      File logFile = new File(fileName);
+      if (!logFile.exists()) {
+        this.setFile(fileName);
+        this.activateOptions();
+      }
+    }
+  }
+
+  public long getCheckFileInterval() {
+    return this.checkFileInterval;
+  }
+
+  public void setCheckFileInterval(long checkFileInterval) {
+    this.checkFileInterval = checkFileInterval;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/core/src/main/resources/geardefault.conf 
b/core/src/main/resources/geardefault.conf
index 4eecf3a..19ffb5f 100644
--- a/core/src/main/resources/geardefault.conf
+++ b/core/src/main/resources/geardefault.conf
@@ -30,14 +30,14 @@ gearpump {
   ## The installation folder of gearpump
   home = ""
 
-  serializer.pool = "io.gearpump.serializer.FastKryoSerializerPool"
+  serializer.pool = "org.apache.gearpump.serializer.FastKryoSerializerPool"
 
   ## How many slots each worker contains
   worker.slots = 1000
 
   ## The class responsable for launching the executor process.
-  ## User can switch to "io.gearpump.cluster.worker.CGroupProcessLauncher" to 
enable CGroup support.
-  worker.executor-process-launcher = 
"io.gearpump.cluster.worker.DefaultExecutorProcessLauncher"
+  ## User can switch to 
"org.apache.gearpump.cluster.worker.CGroupProcessLauncher" to enable CGroup 
support.
+  worker.executor-process-launcher = 
"org.apache.gearpump.cluster.worker.DefaultExecutorProcessLauncher"
 
   ## Number of executors to launch when starting an application
   application.executor-num = 1
@@ -57,7 +57,7 @@ gearpump {
   master-resource-manager-container-id = ""
 
   ## To enable worker use cgroup to make resource isolation,
-  ## set gearpump.worker.executor-process-launcher = 
"io.gearpump.cluster.worker.CGroupProcessLauncher"
+  ## set gearpump.worker.executor-process-launcher = 
"org.apache.gearpump.cluster.worker.CGroupProcessLauncher"
   ##
   ## Before enable it, you should also make sure:
   ##   1. Linux version (>= 2.6.18)
@@ -176,7 +176,7 @@ gearpump {
       ### Whitelist for Metrics Aggregator class.
       ### See class [[MetricsAggregator]] for more information.
       metrics-aggregator-class {
-        ## Format io.gearpump.KeyFullClassName = ""
+        ## Format org.apache.gearpump.KeyFullClassName = ""
       }
     }
   }
@@ -219,9 +219,9 @@ gearpump {
 
   ### Gearpump has built-in serialization framework using Kryo.
   ### User are allowed to use a different serialization framework, like 
Protobuf
-  ### See [io.gearpump.serializer.FastKryoSerializationFramework] to find how
+  ### See [org.apache.gearpump.serializer.FastKryoSerializationFramework] to 
find how
   ### a custom serialization framework can be defined.
-  serialization-framework = 
"io.gearpump.serializer.FastKryoSerializationFramework"
+  serialization-framework = 
"org.apache.gearpump.serializer.FastKryoSerializationFramework"
 
   ### Define where the submitted jar file will be stored at
 
@@ -238,7 +238,7 @@ gearpump {
   ### If you don't know what is this about, don't change it
   #########################
   scheduling {
-    scheduler-class = "io.gearpump.cluster.scheduler.PriorityScheduler"
+    scheduler-class = "org.apache.gearpump.cluster.scheduler.PriorityScheduler"
   }
 
   #############################################
@@ -396,15 +396,15 @@ gearpump-ui {
     ## authentication channel like OAuth2.
     ##
     ## User can replace this with a custom User-Password based authenticator,
-    ## which implements interface io.gearpump.security.Authenticator
+    ## which implements interface org.apache.gearpump.security.Authenticator
     ##
-    authenticator = "io.gearpump.security.ConfigFileBasedAuthenticator"
+    authenticator = "org.apache.gearpump.security.ConfigFileBasedAuthenticator"
 
-    ## Configuration options for authenticator 
io.gearpump.security.ConfigFileBasedAuthenticator
+    ## Configuration options for authenticator 
org.apache.gearpump.security.ConfigFileBasedAuthenticator
     config-file-based-authenticator = {
       ## Format: username = "password_hash_value"
       ## password_hash_value can be generated by running shell tool:
-      ## bin/gear io.gearpump.security.PasswordUtil -password <your raw 
password>
+      ## bin/gear org.apache.gearpump.security.PasswordUtil -password <your 
raw password>
 
       ## Admin users have super permission to do everything
       admins = {
@@ -456,7 +456,7 @@ gearpump-ui {
       ## For steps to enable OAuth2 Authentication on Google, please view 
docs/deployment-ui-authentication.md
       ##
       "google" {
-        "class" = 
"io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator"
+        "class" = 
"org.apache.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator"
 
         ## Please replace "127.0.0.1:8090" with your address of UI service.
         "callback" = "http://127.0.0.1:8090/login/oauth2/google/callback";
@@ -487,7 +487,7 @@ gearpump-ui {
       ## For steps to enable OAuth2 Authentication for UAA, please view 
docs/deployment-ui-authentication.md
       ##
       "cloudfoundryuaa" {
-        "class" = 
"io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator"
+        "class" = 
"org.apache.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator"
 
         ## Please replace "127.0.0.1:8090" with your address of UI service.
         "callback" = 
"http://127.0.0.1:8090/login/oauth2/cloudfoundryuaa/callback";
@@ -525,7 +525,7 @@ gearpump-ui {
         ## Define how to do additional authorization check. The class should 
implement
         ## interface CloudFoundryUAAOAuth2Authenticator.AdditionalAuthenticator
         additional-authenticator = {
-          "class" = 
"io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator$OrganizationAccessChecker"
+          "class" = 
"org.apache.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator$OrganizationAccessChecker"
 
           ## Please fill the Cloud Foundry API endpoint and organization GUID
           "organization-url" = "http://<cloud foundry api 
endpoint>/v2/organizations/<organization-guid>"
@@ -593,8 +593,8 @@ akka {
   daemonic = on
 
   extensions = [
-    "io.gearpump.transport.Express$",
-    "io.gearpump.metrics.Metrics$"
+    "org.apache.gearpump.transport.Express$",
+    "org.apache.gearpump.metrics.Metrics$"
   ]
   loglevel = "INFO"
   loggers = ["akka.event.slf4j.Slf4jLogger"]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/log4j.properties 
b/core/src/main/resources/log4j.properties
index 682a11b..cbe0749 100644
--- a/core/src/main/resources/log4j.properties
+++ b/core/src/main/resources/log4j.properties
@@ -61,7 +61,7 @@ log4j.threshhold=ALL
 # Rolling File Appender
 #
 #log4j.appender.RollingFileAppender=org.apache.log4j.RollingFileAppender
-log4j.appender.RollingFileAppender=io.gearpump.util.RecreateRollingFileAppender
+log4j.appender.RollingFileAppender=org.apache.gearpump.util.RecreateRollingFileAppender
 
log4j.appender.RollingFileAppender.File=${gearpump.log.dir}/${gearpump.log.file}
 log4j.appender.RollingFileAppender.checkFileInterval=60
 log4j.appender.RollingFileAppender.layout=org.apache.log4j.PatternLayout
@@ -86,7 +86,7 @@ log4j.appender.console.layout.ConversionPattern=[%p] 
[%d{MM/dd/yyyy HH:mm:ss.SSS
 #
 # Application Log Appender
 #
-log4j.appender.ApplicationLogAppender=io.gearpump.util.RecreateRollingFileAppender
+log4j.appender.ApplicationLogAppender=org.apache.gearpump.util.RecreateRollingFileAppender
 
log4j.appender.ApplicationLogAppender.File=${gearpump.application.log.dir}/${gearpump.application.log.file}
 log4j.appender.ApplicationLogAppender.checkFileInterval=60
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/Message.scala 
b/core/src/main/scala/io/gearpump/Message.scala
deleted file mode 100644
index aec8089..0000000
--- a/core/src/main/scala/io/gearpump/Message.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump
-
-/**
- * Each message contains a immutable timestamp.
- *
- * For example, if you take a picture, the time you take the picture is the
- * message's timestamp.
- * @param msg Accept any type except Null, Nothing and Unit
- */
-case class Message(msg: Any, timestamp: TimeStamp = Message.noTimeStamp)
-
-object Message {
-  val noTimeStamp: TimeStamp = 0L
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/AppDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/AppDescription.scala 
b/core/src/main/scala/io/gearpump/cluster/AppDescription.scala
deleted file mode 100644
index 799c20a..0000000
--- a/core/src/main/scala/io/gearpump/cluster/AppDescription.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster
-
-import scala.reflect.ClassTag
-
-import akka.actor.{Actor, ActorRef, ActorSystem}
-import com.typesafe.config.{Config, ConfigFactory}
-
-import io.gearpump.cluster.appmaster.WorkerInfo
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.jarstore.FilePath
-
-/**
- * This contains all information to run an application
- *
- * @param name The name of this application
- * @param appMaster The class name of AppMaster Actor
- * @param userConfig user configuration.
- * @param clusterConfig User provided cluster config, it overrides gear.conf 
when starting
- *                      new applications. In most cases, you should not need 
to change it. If you do
- *                      really need to change it, please use 
ClusterConfigSource(filePath) to
- *                      construct the object, while filePath points to the 
.conf file.
- */
-case class AppDescription(
-    name: String, appMaster: String, userConfig: UserConfig,
-    clusterConfig: Config = ConfigFactory.empty())
-
-/**
- * Each job, streaming or not streaming, need to provide an Application class.
- * The master uses this class to start AppMaster.
- */
-trait Application {
-
-  /** Name of this application, must be unique in the system */
-  def name: String
-
-  /** Custom user configuration  */
-  def userConfig(implicit system: ActorSystem): UserConfig
-
-  /**
-   * AppMaster class, must have a constructor like this:
-   * this(appContext: AppMasterContext, app: AppDescription)
-   */
-  def appMaster: Class[_ <: ApplicationMaster]
-}
-
-object Application {
-  def apply[T <: ApplicationMaster](
-      name: String, userConfig: UserConfig)(implicit tag: ClassTag[T]): 
Application = {
-    new DefaultApplication(name, userConfig,
-      tag.runtimeClass.asInstanceOf[Class[_ <: ApplicationMaster]])
-  }
-
-  class DefaultApplication(
-      override val name: String, inputUserConfig: UserConfig,
-      val appMaster: Class[_ <: ApplicationMaster]) extends Application {
-    override def userConfig(implicit system: ActorSystem): UserConfig = 
inputUserConfig
-  }
-
-  def ApplicationToAppDescription(app: Application)(implicit system: 
ActorSystem)
-    : AppDescription = {
-    val filterJvmReservedKeys = 
ClusterConfig.filterOutDefaultConfig(system.settings.config)
-    AppDescription(app.name, app.appMaster.getName, app.userConfig, 
filterJvmReservedKeys)
-  }
-}
-
-/**
- * Used for verification. All AppMaster must extend this interface
- */
-abstract class ApplicationMaster extends Actor
-
-/**
- * This contains context information when starting an AppMaster
- *
- * @param appId application instance id assigned, it is unique in the cluster
- * @param username The username who submitted this application
- * @param resource Resouce allocated to start this AppMaster daemon. AppMaster 
are allowed to
- *                 request more resource from Master.
- * @param appJar application Jar. If the jar is already in classpath, then it 
can be None.
- * @param masterProxy The proxy to master actor, it bridges the messages 
between appmaster
- *                    and master
- * @param registerData AppMaster are required to send this data to Master by 
when doing
- *                     RegisterAppMaster.
- */
-case class AppMasterContext(
-    appId: Int,
-    username: String,
-    resource: Resource,
-    workerInfo: WorkerInfo,
-    appJar: Option[AppJar],
-    masterProxy: ActorRef,
-    registerData: AppMasterRegisterData)
-
-/**
- * Jar file container in the cluster
- *
- * @param name A meaningful name to represent this jar
- * @param filePath Where the jar file is stored.
- */
-case class AppJar(name: String, filePath: FilePath)
-
-/**
- * Serves as the context to start an Executor JVM.
- */
-// TODO: ExecutorContext doesn't belong to this package in logic.
-case class ExecutorContext(
-    executorId: Int, worker: WorkerInfo, appId: Int, appName: String,
-    appMaster: ActorRef, resource: Resource)
-
-/**
- * JVM configurations to start an Executor JVM.
- *
- * @param classPath When executor is created by a worker JVM, executor 
automatically inherits
- *                  parent worker's classpath. Sometimes, you still want to 
add some extra
- *                  classpath, you can do this by specify classPath option.
- * @param jvmArguments java arguments like -Dxx=yy
- * @param mainClass Executor main class name like io.gearpump.xx.AppMaster
- * @param arguments Executor command line arguments
- * @param jar application jar
- * @param executorAkkaConfig Akka config used to initialize the actor system 
of this executor.
- *                           It uses 
io.gearpump.util.Constants.GEARPUMP_CUSTOM_CONFIG_FILE
- *                           to pass the config to executor process
- */
-// TODO: ExecutorContext doesn't belong to this package in logic.
-case class ExecutorJVMConfig(
-    classPath: Array[String], jvmArguments: Array[String], mainClass: String,
-    arguments: Array[String], jar: Option[AppJar], username: String,
-    executorAkkaConfig: Config = ConfigFactory.empty())
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala 
b/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
deleted file mode 100644
index 5cc49e7..0000000
--- a/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster
-
-import java.io.File
-
-import com.typesafe.config._
-
-import io.gearpump.util.Constants._
-import io.gearpump.util.{Constants, FileUtils, LogUtil, Util}
-
-/**
- *
- * All Gearpump application should use this class to load configurations.
- *
- * Compared with Akka built-in com.typesafe.config.ConfigFactory, this class 
also
- * resolve config from file gear.conf and geardefault.conf.
- *
- * Overriding order:
- * {{{
- *   System Properties
- *     > Custom configuration file (by using system property 
-Dgearpump.config.file) >
- *     > gear.conf
- *     > geardefault.conf
- *     > reference.conf
- * }}}
- */
-
-object ClusterConfig {
-  /**
-   * alias for default
-   * default is a reserved word for java
-   * @return
-   */
-  def defaultConfig: Config = {
-    default(APPLICATION)
-  }
-
-  /**
-   * default application for user.
-   * Usually used when user want to start an client application.
-   * @return
-   */
-  def default(configFile: String = APPLICATION): Config = {
-    load(configFile).default
-  }
-
-  /**
-   * configuration for master node
-   * @return
-   */
-  def master(configFile: String = null): Config = {
-    load(configFile).master
-  }
-
-  /*
-   * configuration for worker node
-   */
-  def worker(configFile: String = null): Config = {
-    load(configFile).worker
-  }
-
-  /**
-   * configuration for UI server
-   * @return
-   */
-  def ui(configFile: String = null): Config = {
-    load(configFile).ui
-  }
-
-  /**
-   * try to load system property gearpump.config.file, or use configFile
-   */
-  private def load(configFile: String): Configs = {
-    val file = Option(System.getProperty(GEARPUMP_CUSTOM_CONFIG_FILE))
-    file match {
-      case Some(path) =>
-        LOG.info("loading config file " + path + "..........")
-        load(ClusterConfigSource(path))
-      case None =>
-        LOG.info("loading config file application.conf...")
-        load(ClusterConfigSource(configFile))
-    }
-  }
-
-  val APPLICATION = "application.conf"
-  val LOG = LogUtil.getLogger(getClass)
-
-  def saveConfig(conf: Config, file: File): Unit = {
-    val serialized = conf.root().render()
-    FileUtils.write(file, serialized)
-  }
-
-  def render(config: Config, concise: Boolean = false): String = {
-    if (concise) {
-      config.root().render(ConfigRenderOptions.concise().setFormatted(true))
-    } else {
-      config.root().render(ConfigRenderOptions.defaults())
-    }
-  }
-
-  /** filter JVM reserved keys and akka default reference.conf */
-  def filterOutDefaultConfig(input: Config): Config = {
-    val updated = filterOutJvmReservedKeys(input)
-    Util.filterOutOrigin(updated, "reference.conf")
-  }
-
-  private[gearpump] def load(source: ClusterConfigSource): Configs = {
-
-    val systemProperties = getSystemProperties
-
-    val user = source.getConfig
-
-    val gear = ConfigFactory.parseResourcesAnySyntax("gear.conf",
-      ConfigParseOptions.defaults.setAllowMissing(true))
-
-    val gearDefault = ConfigFactory.parseResourcesAnySyntax("geardefault.conf",
-      ConfigParseOptions.defaults.setAllowMissing(true))
-
-    val all = 
systemProperties.withFallback(user).withFallback(gear).withFallback(gearDefault)
-
-    val linux = all.getConfig(LINUX_CONFIG)
-
-    var basic = all.withoutPath(MASTER_CONFIG).withoutPath(WORKER_CONFIG).
-      withoutPath(UI_CONFIG).withoutPath(LINUX_CONFIG)
-
-    if (!akka.util.Helpers.isWindows) {
-
-      // Change the akka.scheduler.tick-duration to 1 ms for Linux or Mac
-      basic = linux.withFallback(basic)
-    }
-
-    val master = replaceHost(all.getConfig(MASTER_CONFIG).withFallback(basic))
-    val worker = replaceHost(all.getConfig(WORKER_CONFIG).withFallback(basic))
-    val ui = replaceHost(all.getConfig(UI_CONFIG).withFallback(basic))
-    val app = replaceHost(basic)
-
-    new Configs(master, worker, ui, app)
-  }
-
-  private def replaceHost(config: Config): Config = {
-    val hostName = config.getString(Constants.GEARPUMP_HOSTNAME)
-    config.withValue(NETTY_TCP_HOSTNAME, 
ConfigValueFactory.fromAnyRef(hostName))
-  }
-
-  val JVM_RESERVED_PROPERTIES = List(
-    "os", "java", "sun", "boot", "user", "prog", "path", "line", "awt", "file"
-  )
-
-  private def getSystemProperties: Config = {
-    // Excludes default java system properties
-    JVM_RESERVED_PROPERTIES.foldLeft(ConfigFactory.systemProperties()) { 
(config, property) =>
-      config.withoutPath(property)
-    }
-  }
-
-  class ConfigValidationException(msg: String) extends Exception(msg: String)
-
-  private def filterOutJvmReservedKeys(input: Config): Config = {
-    val filterJvmReservedKeys = JVM_RESERVED_PROPERTIES.foldLeft(input) { 
(config, key) =>
-      config.withoutPath(key)
-    }
-    filterJvmReservedKeys
-  }
-
-  protected class Configs(
-      val master: Config, val worker: Config, val ui: Config, val default: 
Config)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala 
b/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala
deleted file mode 100644
index 3a248d7..0000000
--- a/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster
-
-import java.io.File
-import scala.language.implicitConversions
-
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-
-/**
- * Data Source of ClusterConfig
- *
- * Please use ClusterConfigSource.apply(filePath) to construct this object
- */
-sealed trait ClusterConfigSource extends Serializable {
-  def getConfig: Config
-}
-
-object ClusterConfigSource {
-
-  /**
-   * Construct ClusterConfigSource from resource name or file path
-   */
-  def apply(filePath: String): ClusterConfigSource = {
-
-    if (null == filePath) {
-      new ClusterConfigSourceImpl(ConfigFactory.empty())
-    } else {
-      var config = ConfigFactory.parseFileAnySyntax(new File(filePath),
-        ConfigParseOptions.defaults.setAllowMissing(true))
-
-      if (null == config || config.isEmpty) {
-        config = ConfigFactory.parseResourcesAnySyntax(filePath,
-          ConfigParseOptions.defaults.setAllowMissing(true))
-      }
-      new ClusterConfigSourceImpl(config)
-    }
-  }
-
-  implicit def FilePathToClusterConfigSource(filePath: String): 
ClusterConfigSource = {
-    apply(filePath)
-  }
-
-  private class ClusterConfigSourceImpl(config: Config) extends 
ClusterConfigSource {
-    override def getConfig: Config = config
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala 
b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
deleted file mode 100644
index ed187a1..0000000
--- a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster
-
-import scala.util.Try
-
-import akka.actor.ActorRef
-import com.typesafe.config.Config
-
-import io.gearpump.TimeStamp
-import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus
-import io.gearpump.cluster.master.{MasterNode, MasterSummary}
-import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, 
ResourceRequest}
-import io.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import io.gearpump.metrics.Metrics.MetricType
-
-object ClientToMaster {
-  case object AddMaster
-  case class AddWorker(count: Int)
-  case class RemoveMaster(masterContainerId: String)
-  case class RemoveWorker(workerContainerId: String)
-
-  /** Command result of AddMaster, RemoveMaster, and etc... */
-  case class CommandResult(success: Boolean, exception: String = null) {
-    override def toString: String = {
-      val tag = getClass.getSimpleName
-      if (success) {
-        s"$tag(success)"
-      } else {
-        s"$tag(failure, $exception)"
-      }
-    }
-  }
-
-  /** Submit an application to master */
-  case class SubmitApplication(
-      appDescription: AppDescription, appJar: Option[AppJar],
-      username: String = System.getProperty("user.name"))
-
-  case class RestartApplication(appId: Int)
-  case class ShutdownApplication(appId: Int)
-
-  /** Client send ResolveAppId to Master to resolves AppMaster actor path by 
providing appId */
-  case class ResolveAppId(appId: Int)
-
-  /** Client send ResolveWorkerId to master to get the Actor path of worker. */
-  case class ResolveWorkerId(workerId: WorkerId)
-
-  /** Get an active Jar store to upload job jars, like wordcount.jar */
-  case object GetJarStoreServer
-
-  /** Service address of JarStore */
-  case class JarStoreServerAddress(url: String)
-
-  /** Query AppMaster config by providing appId */
-  case class QueryAppMasterConfig(appId: Int)
-
-  /** Query worker config */
-  case class QueryWorkerConfig(workerId: WorkerId)
-
-  /** Query master config */
-  case object QueryMasterConfig
-
-  /** Options for read the metrics from the cluster */
-  object ReadOption {
-    type ReadOption = String
-
-    val Key: String = "readOption"
-
-    /** Read the latest record of the metrics, only return 1 record for one 
metric name (id) */
-    val ReadLatest: ReadOption = "readLatest"
-
-    /** Read recent metrics from cluster, typically it contains metrics in 5 
minutes */
-    val ReadRecent = "readRecent"
-
-    /**
-     * Read the history metrics, typically it contains metrics for 48 hours
-     *
-     * NOTE: Each hour only contain one or two data points.
-     */
-    val ReadHistory = "readHistory"
-  }
-
-  /** Query history metrics from master or app master. */
-  case class QueryHistoryMetrics(
-      path: String, readOption: ReadOption.ReadOption = ReadOption.ReadLatest,
-      aggregatorClazz: String = "", options: Map[String, String] = 
Map.empty[String, String])
-
-  /**
-   * If there are message loss, the clock would pause for a while. This 
message is used to
-   * pin-point which task has stalling clock value, and usually it means 
something wrong on
-   * that machine.
-   */
-  case class GetStallingTasks(appId: Int)
-
-  /**
-   * Request app master for a short list of cluster app that administrators 
should be aware of.
-   */
-  case class GetLastFailure(appId: Int)
-}
-
-object MasterToClient {
-
-  /** Result of SubmitApplication */
-  // TODO: Merge with SubmitApplicationResultValue and change this to (appId: 
Option, ex: Exception)
-  case class SubmitApplicationResult(appId: Try[Int])
-
-  case class SubmitApplicationResultValue(appId: Int)
-
-  case class ShutdownApplicationResult(appId: Try[Int])
-  case class ReplayApplicationResult(appId: Try[Int])
-
-  /** Return Actor ref of app master */
-  case class ResolveAppIdResult(appMaster: Try[ActorRef])
-
-  /** Return Actor ref of worker */
-  case class ResolveWorkerIdResult(worker: Try[ActorRef])
-
-  case class AppMasterConfig(config: Config)
-
-  case class WorkerConfig(config: Config)
-
-  case class MasterConfig(config: Config)
-
-  case class HistoryMetricsItem(time: TimeStamp, value: MetricType)
-
-  /**
-   * History metrics returned from master, worker, or app master.
-   *
-   * All metric items are organized like a tree, path is used to navigate 
through the tree.
-   * For example, when querying with path == "executor0.task1.throughput*", 
the metrics
-   * provider picks metrics whose source matches the path.
-   *
-   * @param path The path client provided. The returned metrics are the result 
query of this path.
-   * @param metrics The detailed metrics.
-   */
-  case class HistoryMetrics(path: String, metrics: List[HistoryMetricsItem])
-
-  /** Return the last error of this streaming application job */
-  case class LastFailure(time: TimeStamp, error: String)
-}
-
-trait AppMasterRegisterData
-
-object AppMasterToMaster {
-
-  /**
-   * Register an AppMaster by providing a ActorRef, and registerData
-   * @param registerData The registerData is provided by Master when starting 
the app master.
-   *                     App master should return the registerData back to 
master.
-   *                     Typically registerData hold some context information 
for this app Master.
-   */
-
-  case class RegisterAppMaster(appMaster: ActorRef, registerData: 
AppMasterRegisterData)
-
-  case class InvalidAppMaster(appId: Int, appMaster: String, reason: Throwable)
-
-  case class RequestResource(appId: Int, request: ResourceRequest)
-
-  /**
-   * Each application job can save some data in the distributed cluster 
storage on master nodes.
-   *
-   * @param appId App Id of the client application who send the request.
-   * @param key Key name
-   * @param value Value to store on distributed cluster storage on master nodes
-   */
-  case class SaveAppData(appId: Int, key: String, value: Any)
-
-  /** The application specific data is successfully stored */
-  case object AppDataSaved
-
-  /** Fail to store the application data */
-  case object SaveAppDataFailed
-
-  /** Fetch the application specific data that stored previously */
-  case class GetAppData(appId: Int, key: String)
-
-  /** The KV data returned for query GetAppData */
-  case class GetAppDataResult(key: String, value: Any)
-
-  /**
-   * AppMasterSummary returned to REST API query. Streaming and Non-streaming
-   * have very different application info. AppMasterSummary is the common 
interface.
-   */
-  trait AppMasterSummary {
-    def appType: String
-    def appId: Int
-    def appName: String
-    def actorPath: String
-    def status: AppMasterStatus
-    def startTime: TimeStamp
-    def uptime: TimeStamp
-    def user: String
-  }
-
-  /** Represents a generic application that is not a streaming job */
-  case class GeneralAppMasterSummary(
-      appId: Int,
-      appType: String = "general",
-      appName: String = null,
-      actorPath: String = null,
-      status: AppMasterStatus = MasterToAppMaster.AppMasterActive,
-      startTime: TimeStamp = 0L,
-      uptime: TimeStamp = 0L,
-      user: String = null)
-    extends AppMasterSummary
-
-  /** Fetches the list of workers from Master */
-  case object GetAllWorkers
-
-  /** Get worker data of workerId */
-  case class GetWorkerData(workerId: WorkerId)
-
-  /** Response to GetWorkerData */
-  case class WorkerData(workerDescription: WorkerSummary)
-
-  /** Get Master data */
-  case object GetMasterData
-
-  /** Response to GetMasterData */
-  case class MasterData(masterDescription: MasterSummary)
-}
-
-object MasterToAppMaster {
-
-  /** Resource allocated for application xx */
-  case class ResourceAllocated(allocations: Array[ResourceAllocation])
-
-  /** Master confirm reception of RegisterAppMaster message */
-  case class AppMasterRegistered(appId: Int)
-
-  /** Shutdown the application job */
-  case object ShutdownAppMaster
-
-  type AppMasterStatus = String
-  val AppMasterActive: AppMasterStatus = "active"
-  val AppMasterInActive: AppMasterStatus = "inactive"
-  val AppMasterNonExist: AppMasterStatus = "nonexist"
-
-  sealed trait StreamingType
-  case class AppMasterData(
-      status: AppMasterStatus, appId: Int = 0, appName: String = null, 
appMasterPath: String = null,
-      workerPath: String = null, submissionTime: TimeStamp = 0, startTime: 
TimeStamp = 0,
-      finishTime: TimeStamp = 0, user: String = null)
-
-  case class AppMasterDataRequest(appId: Int, detail: Boolean = false)
-
-  case class AppMastersData(appMasters: List[AppMasterData])
-  case object AppMastersDataRequest
-  case class AppMasterDataDetailRequest(appId: Int)
-  case class AppMasterMetricsRequest(appId: Int) extends StreamingType
-
-  case class ReplayFromTimestampWindowTrailingEdge(appId: Int)
-
-  case class WorkerList(workers: List[WorkerId])
-}
-
-object AppMasterToWorker {
-  case class LaunchExecutor(
-      appId: Int, executorId: Int, resource: Resource, executorJvmConfig: 
ExecutorJVMConfig)
-
-  case class ShutdownExecutor(appId: Int, executorId: Int, reason: String)
-  case class ChangeExecutorResource(appId: Int, executorId: Int, resource: 
Resource)
-}
-
-object WorkerToAppMaster {
-  case class ExecutorLaunchRejected(reason: String = null, ex: Throwable = 
null)
-  case class ShutdownExecutorSucceed(appId: Int, executorId: Int)
-  case class ShutdownExecutorFailed(reason: String = null, ex: Throwable = 
null)
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/UserConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/UserConfig.scala 
b/core/src/main/scala/io/gearpump/cluster/UserConfig.scala
deleted file mode 100644
index 61de1dd..0000000
--- a/core/src/main/scala/io/gearpump/cluster/UserConfig.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster
-
-import akka.actor.{ActorSystem, ExtendedActorSystem}
-import akka.serialization.JavaSerializer
-
-import io.gearpump.google.common.io.BaseEncoding
-
-/**
- * Immutable configuration
- */
-final class UserConfig(private val _config: Map[String, String]) extends 
Serializable {
-
-  def withBoolean(key: String, value: Boolean): UserConfig = {
-    new UserConfig(_config + (key -> value.toString))
-  }
-
-  def withDouble(key: String, value: Double): UserConfig = {
-    new UserConfig(_config + (key -> value.toString))
-  }
-
-  def withFloat(key: String, value: Float): UserConfig = {
-    new UserConfig(_config + (key -> value.toString))
-  }
-
-  def withInt(key: String, value: Int): UserConfig = {
-    new UserConfig(_config + (key -> value.toString))
-  }
-
-  def withLong(key: String, value: Long): UserConfig = {
-    new UserConfig(_config + (key -> value.toString))
-  }
-
-  def withString(key: String, value: String): UserConfig = {
-    if (null == value) {
-      this
-    } else {
-      new UserConfig(_config + (key -> value))
-    }
-  }
-
-  def without(key: String): UserConfig = {
-    val config = _config - key
-    new UserConfig(config)
-  }
-
-  def filter(p: ((String, String)) => Boolean): UserConfig = {
-    val updated = _config.filter(p)
-    new UserConfig(updated)
-  }
-
-  def getBoolean(key: String): Option[Boolean] = {
-    _config.get(key).map(_.toBoolean)
-  }
-
-  def getDouble(key: String): Option[Double] = {
-    _config.get(key).map(_.toDouble)
-  }
-
-  def getFloat(key: String): Option[Float] = {
-    _config.get(key).map(_.toFloat)
-  }
-
-  def getInt(key: String): Option[Int] = {
-    _config.get(key).map(_.toInt)
-  }
-
-  def getLong(key: String): Option[Long] = {
-    _config.get(key).map(_.toLong)
-  }
-
-  def getString(key: String): Option[String] = {
-    _config.get(key)
-  }
-
-  def getBytes(key: String): Option[Array[Byte]] = {
-    _config.get(key).map(BaseEncoding.base64().decode(_))
-  }
-
-  def withBytes(key: String, value: Array[Byte]): UserConfig = {
-    if (null == value) {
-      this
-    } else {
-      this.withString(key, BaseEncoding.base64().encode(value))
-    }
-  }
-
-  // scalastyle:off line.size.limit
-  /**
-   * This de-serializes value to object instance
-   *
-   * To do de-serialization, this requires an implicit ActorSystem, as
-   * the ActorRef and possibly other akka classes deserialization
-   * requires an implicit ActorSystem.
-   *
-   * See Link:
-   * 
http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization
-   */
-
-  def getValue[T](key: String)(implicit system: ActorSystem): Option[T] = {
-    val serializer = new 
JavaSerializer(system.asInstanceOf[ExtendedActorSystem])
-    _config.get(key).map(BaseEncoding.base64().decode(_))
-      .map(serializer.fromBinary(_).asInstanceOf[T])
-  }
-
-  /**
-   * This serializes the object and store it as string.
-   *
-   * To do serialization, this requires an implicit ActorSystem, as
-   * the ActorRef and possibly other akka classes serialization
-   * requires an implicit ActorSystem.
-   *
-   * See Link:
-   * 
http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization
-   */
-  def withValue[T <: AnyRef](key: String, value: T)(implicit system: 
ActorSystem): UserConfig = {
-
-    if (null == value) {
-      this
-    } else {
-      val serializer = new 
JavaSerializer(system.asInstanceOf[ExtendedActorSystem])
-      val bytes = serializer.toBinary(value)
-      val encoded = BaseEncoding.base64().encode(bytes)
-      this.withString(key, encoded)
-    }
-  }
-  // scalastyle:on line.size.limit
-
-  def withConfig(other: UserConfig): UserConfig = {
-    if (null == other) {
-      this
-    } else {
-      new UserConfig(_config ++ other._config)
-    }
-  }
-}
-
-object UserConfig {
-
-  def empty: UserConfig = new UserConfig(Map.empty[String, String])
-
-  def apply(config: Map[String, String]): UserConfig = new UserConfig(config)
-
-  def unapply(config: UserConfig): Option[Map[String, String]] = 
Option(config._config)
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
 
b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
deleted file mode 100644
index 4e8582b..0000000
--- 
a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import akka.actor._
-
-import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
-import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{Session, 
StartExecutorSystems}
-import 
io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus._
-import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.cluster.{AppDescription, AppMasterContext}
-import io.gearpump.util.LogUtil
-
-/**
- * This serves as runtime environment for AppMaster.
- * When starting an AppMaster, we need to setup the connection to master,
- * and prepare other environments.
- *
- * This also extend the function of Master, by providing a scheduler service 
for Executor System.
- * AppMaster can ask Master for executor system directly. details like 
requesting resource,
- * contacting worker to start a process, and then starting an executor system 
is hidden from
- * AppMaster.
- *
- * Please use AppMasterRuntimeEnvironment.props() to construct this actor.
- */
-private[appmaster]
-class AppMasterRuntimeEnvironment(
-    appContextInput: AppMasterContext,
-    app: AppDescription,
-    masters: Iterable[ActorPath],
-    masterFactory: (AppId, MasterActorRef) => Props,
-    appMasterFactory: (AppMasterContext, AppDescription) => Props,
-    masterConnectionKeeperFactory: (MasterActorRef, RegisterAppMaster, 
ListenerActorRef) => Props)
-  extends Actor {
-
-  val appId = appContextInput.appId
-  private val LOG = LogUtil.getLogger(getClass, app = appId)
-
-  import scala.concurrent.duration._
-
-  private val master = context.actorOf(
-    masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 
30.seconds)))))
-  private val appContext = appContextInput.copy(masterProxy = master)
-
-  // Create appMaster proxy to receive command and forward to appmaster
-  private val appMaster = context.actorOf(appMasterFactory(appContext, app))
-  context.watch(appMaster)
-
-  private val registerAppMaster = RegisterAppMaster(appMaster, 
appContext.registerData)
-  private val masterConnectionKeeper = context.actorOf(
-    masterConnectionKeeperFactory(master, registerAppMaster, self))
-  context.watch(masterConnectionKeeper)
-
-  def receive: Receive = {
-    case MasterConnected =>
-      LOG.info(s"Master is connected, start AppMaster ${appId}...")
-      appMaster ! StartAppMaster
-    case MasterStopped =>
-      LOG.error(s"Master is stopped, stop AppMaster ${appId}...")
-      context.stop(self)
-    case Terminated(actor) => actor match {
-      case `appMaster` =>
-        LOG.error(s"AppMaster ${appId} is stopped, shutdown myself")
-        context.stop(self)
-      case `masterConnectionKeeper` =>
-        LOG.error(s"Master connection keeper is stopped, appId: ${appId}, 
shutdown myself")
-        context.stop(self)
-      case _ => // Skip
-    }
-  }
-}
-
-object AppMasterRuntimeEnvironment {
-
-  def props(
-      masters: Iterable[ActorPath], app: AppDescription, appContextInput: 
AppMasterContext)
-    : Props = {
-
-    val master = (appId: AppId, masterProxy: MasterActorRef) =>
-      MasterWithExecutorSystemProvider.props(appId, masterProxy)
-
-    val appMaster = (appContext: AppMasterContext, app: AppDescription) =>
-      LazyStartAppMaster.props(appContext, app)
-
-    val masterConnectionKeeper = (master: MasterActorRef, registerAppMaster:
-      RegisterAppMaster, listener: ListenerActorRef) => Props(new 
MasterConnectionKeeper(
-        registerAppMaster, master, masterStatusListener = listener))
-
-    Props(new AppMasterRuntimeEnvironment(
-      appContextInput, app, masters, master, appMaster, 
masterConnectionKeeper))
-  }
-
-  /**
-   * This behavior like a AppMaster. Under the hood, It start start the real 
AppMaster in a lazy
-   * way. When real AppMaster is not started yet, all messages are stashed. 
The stashed
-   * messages are forwarded to real AppMaster when the real AppMaster is 
started.
-   *
-   * Please use LazyStartAppMaster.props to construct this actor
-   *
-   * @param appMasterProps  underlying AppMaster Props
-   */
-  private[appmaster]
-  class LazyStartAppMaster(appId: Int, appMasterProps: Props) extends Actor 
with Stash {
-
-    private val LOG = LogUtil.getLogger(getClass, app = appId)
-
-    def receive: Receive = null
-
-    context.become(startAppMaster)
-
-    def startAppMaster: Receive = {
-      case StartAppMaster =>
-        val appMaster = context.actorOf(appMasterProps, "appmaster")
-        context.watch(appMaster)
-        context.become(terminationWatch(appMaster) orElse 
appMasterService(appMaster))
-        unstashAll()
-      case _ =>
-        stash()
-    }
-
-    def terminationWatch(appMaster: ActorRef): Receive = {
-      case Terminated(appMaster) =>
-        LOG.error("appmaster is stopped")
-        context.stop(self)
-    }
-
-    def appMasterService(appMaster: ActorRef): Receive = {
-      case msg => appMaster forward msg
-    }
-  }
-
-  private[appmaster]
-  object LazyStartAppMaster {
-    def props(appContext: AppMasterContext, app: AppDescription): Props = {
-      val appMasterProps = Props(Class.forName(app.appMaster), appContext, app)
-      Props(new LazyStartAppMaster(appContext.appId, appMasterProps))
-    }
-  }
-
-  private[appmaster] case object StartAppMaster
-
-  /**
-   * This enhance Master by providing new service: StartExecutorSystems
-   *
-   * Please use MasterWithExecutorSystemProvider.props to construct this actor
-   *
-   */
-  private[appmaster]
-  class MasterWithExecutorSystemProvider(master: ActorRef, 
executorSystemProviderProps: Props)
-    extends Actor {
-
-    val executorSystemProvider: ActorRef = 
context.actorOf(executorSystemProviderProps)
-
-    override def receive: Receive = {
-      case request: StartExecutorSystems =>
-        executorSystemProvider forward request
-      case msg =>
-        master forward msg
-    }
-  }
-
-  private[appmaster]
-  object MasterWithExecutorSystemProvider {
-    def props(appId: Int, master: ActorRef): Props = {
-
-      val executorSystemLauncher = (appId: Int, session: Session) =>
-        Props(new ExecutorSystemLauncher(appId, session))
-
-      val scheduler = Props(new ExecutorSystemScheduler(appId, master, 
executorSystemLauncher))
-
-      Props(new MasterWithExecutorSystemProvider(master, scheduler))
-    }
-  }
-
-  private[appmaster] type AppId = Int
-  private[appmaster] type MasterActorRef = ActorRef
-  private[appmaster] type ListenerActorRef = ActorRef
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala 
b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
deleted file mode 100644
index 5b3e0c5..0000000
--- 
a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import akka.actor.ActorRef
-import com.typesafe.config.Config
-
-import io.gearpump._
-import io.gearpump.cluster.AppMasterRegisterData
-
-/** Run time info used to start an AppMaster */
-case class AppMasterRuntimeInfo(
-    appId: Int,
-    // AppName is the unique Id for an application
-    appName: String,
-    worker: ActorRef = null,
-    user: String = null,
-    submissionTime: TimeStamp = 0,
-    startTime: TimeStamp = 0,
-    finishTime: TimeStamp = 0,
-    config: Config = null)
-  extends AppMasterRegisterData


Reply via email to