http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/io/gearpump/transport/netty/WrappedChannelBuffer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/io/gearpump/transport/netty/WrappedChannelBuffer.java b/core/src/main/java/io/gearpump/transport/netty/WrappedChannelBuffer.java deleted file mode 100644 index 3531f3b..0000000 --- a/core/src/main/java/io/gearpump/transport/netty/WrappedChannelBuffer.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.transport.netty; - -import org.jboss.netty.buffer.ChannelBuffer; - -import java.io.DataInput; -import java.io.IOException; - -/** Wrap ChannelBuffer as a DataInput */ -public class WrappedChannelBuffer implements DataInput { - private ChannelBuffer channelBuffer; - - public WrappedChannelBuffer() { - } - - public WrappedChannelBuffer(ChannelBuffer channelBuffer) { - this.channelBuffer = channelBuffer; - } - - public void setChannelBuffer(ChannelBuffer channelBuffer) { - this.channelBuffer = channelBuffer; - } - - @Override - public void readFully(byte[] b) throws IOException { - channelBuffer.readBytes(b); - } - - @Override - public void readFully(byte[] b, int off, int len) throws IOException { - channelBuffer.readBytes(b, off, len); - } - - @Override - public int skipBytes(int n) throws IOException { - channelBuffer.skipBytes(n); - return n; - } - - @Override - public boolean readBoolean() throws IOException { - return channelBuffer.readByte() != 0; - } - - @Override - public byte readByte() throws IOException { - return channelBuffer.readByte(); - } - - @Override - public int readUnsignedByte() throws IOException { - return channelBuffer.readUnsignedByte(); - } - - @Override - public short readShort() throws IOException { - return channelBuffer.readShort(); - } - - @Override - public int readUnsignedShort() throws IOException { - return channelBuffer.readUnsignedShort(); - } - - @Override - public char readChar() throws IOException { - return channelBuffer.readChar(); - } - - @Override - public int readInt() throws IOException { - return channelBuffer.readInt(); - } - - @Override - public long readLong() throws IOException { - return channelBuffer.readLong(); - } - - @Override - public float readFloat() throws IOException { - return channelBuffer.readFloat(); - } - - @Override - public double readDouble() throws IOException { - return channelBuffer.readDouble(); - } - - @Override - public String readLine() throws IOException { - throw new UnsupportedOperationException("readLine is not supported"); - } - - @Override - public String readUTF() throws IOException { - throw new UnsupportedOperationException("readUTF is not supported"); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/io/gearpump/util/AkkaHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/io/gearpump/util/AkkaHelper.java b/core/src/main/java/io/gearpump/util/AkkaHelper.java deleted file mode 100644 index 286fabf..0000000 --- a/core/src/main/java/io/gearpump/util/AkkaHelper.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; - -public class AkkaHelper { - - /** - * Helper util to access the private[akka] system.actorFor method - * - * This is used for performance optimization, we encode the session Id - * in the ActorRef path. Session Id is used to identity sender Task. - * - * @param system ActorSystem - * @param path Relative or absolute path of this Actor System. - * @return Full qualified ActorRef. - */ - @SuppressWarnings("deprecation") - public static ActorRef actorFor(ActorSystem system, String path) { - return system.actorFor(path); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java b/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java deleted file mode 100644 index 93b65e8..0000000 --- a/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util; - -import org.apache.log4j.RollingFileAppender; - -/** - * Log4j appender for to write to user specified Hadoop filesystem. - */ -public class HadoopFSLogAppender extends RollingFileAppender { - //TODO: implement Log appender on Hadoop filesystem -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/io/gearpump/util/RecreateRollingFileAppender.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/io/gearpump/util/RecreateRollingFileAppender.java b/core/src/main/java/io/gearpump/util/RecreateRollingFileAppender.java deleted file mode 100644 index acbd1cb..0000000 --- a/core/src/main/java/io/gearpump/util/RecreateRollingFileAppender.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util; - -import org.apache.log4j.RollingFileAppender; -import org.apache.log4j.spi.LoggingEvent; - -import java.io.File; - -/** When log file is deleted, tried to recreate it in file system */ -public class RecreateRollingFileAppender extends RollingFileAppender { - - protected long checkFileInterval = 60L; - private long lastCheckTime = 0L; - - @Override - public void append(LoggingEvent event) { - checkInterval(); - super.append(event); - } - - private void checkInterval() { - long currentTime = System.currentTimeMillis(); - if ((currentTime - lastCheckTime) > (checkFileInterval * 1000)) { - checkLogFileExist(); - lastCheckTime = currentTime; - } - } - - private void checkLogFileExist() { - String fileName = super.fileName; - if (fileName != null) { - File logFile = new File(fileName); - if (!logFile.exists()) { - this.setFile(fileName); - this.activateOptions(); - } - } - } - - public long getCheckFileInterval() { - return this.checkFileInterval; - } - - public void setCheckFileInterval(long checkFileInterval) { - this.checkFileInterval = checkFileInterval; - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/transport/netty/ITransportMessageSerializer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/gearpump/transport/netty/ITransportMessageSerializer.java b/core/src/main/java/org/apache/gearpump/transport/netty/ITransportMessageSerializer.java new file mode 100644 index 0000000..8d3415c --- /dev/null +++ b/core/src/main/java/org/apache/gearpump/transport/netty/ITransportMessageSerializer.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport.netty; + +import java.io.DataInput; +import java.io.DataOutput; + +public interface ITransportMessageSerializer { + + int getLength(Object obj); + + void serialize(DataOutput dataOutput, Object transportMessage); + + Object deserialize(DataInput dataInput, int length); +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/transport/netty/MessageBatch.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/gearpump/transport/netty/MessageBatch.java b/core/src/main/java/org/apache/gearpump/transport/netty/MessageBatch.java new file mode 100644 index 0000000..7ee85f3 --- /dev/null +++ b/core/src/main/java/org/apache/gearpump/transport/netty/MessageBatch.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport.netty; + +import io.gearpump.google.common.io.Closeables; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBufferOutputStream; +import org.jboss.netty.buffer.ChannelBuffers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Netty message on the wire is wrapped as MessageBatch + */ +public class MessageBatch { + private static final Logger log = LoggerFactory.getLogger(MessageBatch.class); + + private int buffer_size; + private List<TaskMessage> messages; + private int encoded_length; + private ITransportMessageSerializer serializer; + + MessageBatch(int buffer_size, ITransportMessageSerializer serializer) { + this.buffer_size = buffer_size; + messages = new ArrayList<TaskMessage>(); + encoded_length = 0; + this.serializer = serializer; + } + + void add(TaskMessage taskMessage) { + if (taskMessage == null) { + throw new RuntimeException("null object forbidden in a message batch"); + } + + messages.add(taskMessage); + encoded_length += msgEncodeLength(taskMessage); + } + + TaskMessage get(int index) { + return messages.get(index); + } + + /** + * try to add a TaskMessage to a batch + * + * @param taskMsg - {@link org.apache.gearpump.transport.netty.TaskMessage} + * @return false if the msg could not be added due to buffer size limit; true otherwise + */ + boolean tryAdd(TaskMessage taskMsg) { + if ((encoded_length + msgEncodeLength(taskMsg)) <= buffer_size) { + add(taskMsg); + return true; + } + return false; + } + + private int msgEncodeLength(TaskMessage taskMsg) { + int size = 0; + if (taskMsg != null) { + size = 24; //sessionId(INT) + sourceTask(LONG) + targetTask(LONG) + messageLength(INT) + if (taskMsg.message() != null) { + size += serializer.getLength(taskMsg.message()); + } + } + return size; + } + + /** + * @return true, if allowed buffer is Full + */ + boolean isFull() { + return encoded_length >= buffer_size; + } + + /** + * @return true, if no messages in this batch + */ + boolean isEmpty() { + return messages.isEmpty(); + } + + /** + * @return number of messages available in this batch + */ + int size() { + return messages.size(); + } + + /** + * create a buffer containing the encoding of this batch + */ + ChannelBuffer buffer() throws IOException { + ChannelBufferOutputStream bout = + new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length)); + + try { + for (TaskMessage msg : messages) { + writeTaskMessage(bout, msg); + } + return bout.buffer(); + } catch (IOException e) { + log.error("Error while writing Tasks to Channel Buffer - {}", e.getMessage()); + } finally { + Closeables.close(bout, false); + } + return null; + } + + /** + * write a TaskMessage into a stream + * <p> + * Each TaskMessage is encoded as: + * sessionId ... int(4) + * source task ... Long(8) + * target task ... long(8) + * len ... int(4) + * payload ... byte[] * + */ + private void writeTaskMessage(ChannelBufferOutputStream bout, + TaskMessage message) throws IOException { + long target_id = message.targetTask(); + long source_id = message.sourceTask(); + int sessionId = message.sessionId(); + int msgLength = serializer.getLength(message.message()); + + bout.writeInt(sessionId); + bout.writeLong(target_id); + bout.writeLong(source_id); + bout.writeInt(msgLength); + serializer.serialize(bout, message.message()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/transport/netty/MessageDecoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/gearpump/transport/netty/MessageDecoder.java b/core/src/main/java/org/apache/gearpump/transport/netty/MessageDecoder.java new file mode 100644 index 0000000..761bb38 --- /dev/null +++ b/core/src/main/java/org/apache/gearpump/transport/netty/MessageDecoder.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport.netty; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.frame.FrameDecoder; + +import java.util.ArrayList; +import java.util.List; + +public class MessageDecoder extends FrameDecoder { + private ITransportMessageSerializer serializer; + private WrappedChannelBuffer dataInput = new WrappedChannelBuffer(); + + public MessageDecoder(ITransportMessageSerializer serializer) { + this.serializer = serializer; + } + + /* + * Each TaskMessage is encoded as: + * sessionId ... int(4) + * source task ... long(8) + * target task ... long(8) + * len ... int(4) + * payload ... byte[] * + */ + protected List<TaskMessage> decode(ChannelHandlerContext ctx, Channel channel, + ChannelBuffer buf) { + this.dataInput.setChannelBuffer(buf); + + final int SESION_LENGTH = 4; //int + final int SOURCE_TASK_LENGTH = 8; //long + final int TARGET_TASK_LENGTH = 8; //long + final int MESSAGE_LENGTH = 4; //int + final int HEADER_LENGTH = SESION_LENGTH + SOURCE_TASK_LENGTH + TARGET_TASK_LENGTH + MESSAGE_LENGTH; + + // Make sure that we have received at least a short message + long available = buf.readableBytes(); + if (available < HEADER_LENGTH) { + //need more data + return null; + } + + List<TaskMessage> taskMessageList = new ArrayList<TaskMessage>(); + + // Use while loop, try to decode as more messages as possible in single call + while (available >= HEADER_LENGTH) { + + // Mark the current buffer position before reading task/len field + // because the whole frame might not be in the buffer yet. + // We will reset the buffer position to the marked position if + // there's not enough bytes in the buffer. + buf.markReaderIndex(); + + int sessionId = buf.readInt(); + long targetTask = buf.readLong(); + long sourceTask = buf.readLong(); + // Read the length field. + int length = buf.readInt(); + + available -= HEADER_LENGTH; + + if (length <= 0) { + taskMessageList.add(new TaskMessage(sessionId, targetTask, sourceTask, null)); + break; + } + + // Make sure if there's enough bytes in the buffer. + if (available < length) { + // The whole bytes were not received yet - return null. + buf.resetReaderIndex(); + break; + } + available -= length; + + // There's enough bytes in the buffer. Read it. + Object message = serializer.deserialize(dataInput, length); + + // Successfully decoded a frame. + // Return a TaskMessage object + taskMessageList.add(new TaskMessage(sessionId, targetTask, sourceTask, message)); + } + + return taskMessageList.size() == 0 ? null : taskMessageList; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/transport/netty/MessageEncoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/gearpump/transport/netty/MessageEncoder.java b/core/src/main/java/org/apache/gearpump/transport/netty/MessageEncoder.java new file mode 100644 index 0000000..008c19d --- /dev/null +++ b/core/src/main/java/org/apache/gearpump/transport/netty/MessageEncoder.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport.netty; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; + +public class MessageEncoder extends OneToOneEncoder { + @Override + protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception { + if (obj instanceof MessageBatch) { + return ((MessageBatch) obj).buffer(); + } + + throw new RuntimeException("Unsupported encoding of object of class " + obj.getClass().getName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/transport/netty/NettyRenameThreadFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/gearpump/transport/netty/NettyRenameThreadFactory.java b/core/src/main/java/org/apache/gearpump/transport/netty/NettyRenameThreadFactory.java new file mode 100644 index 0000000..270a358 --- /dev/null +++ b/core/src/main/java/org/apache/gearpump/transport/netty/NettyRenameThreadFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport.netty; + +import org.jboss.netty.util.ThreadNameDeterminer; +import org.jboss.netty.util.ThreadRenamingRunnable; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class NettyRenameThreadFactory implements ThreadFactory { + + static { + //Rename Netty threads + ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT); + } + + final ThreadGroup group; + final AtomicInteger index = new AtomicInteger(1); + final String name; + + NettyRenameThreadFactory(String name) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + this.name = name; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, name + "-" + index.getAndIncrement(), 0); + if (t.isDaemon()) + t.setDaemon(false); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/transport/netty/TaskMessage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/gearpump/transport/netty/TaskMessage.java b/core/src/main/java/org/apache/gearpump/transport/netty/TaskMessage.java new file mode 100644 index 0000000..beb5b0d --- /dev/null +++ b/core/src/main/java/org/apache/gearpump/transport/netty/TaskMessage.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport.netty; + +public class TaskMessage { + + // When network partition happen, there may be several task instances of + // same taskId co-existing for a short period of time. When they send messages + // to same target task, it may cause confusion. + // With sessionId, we can know which messages are from an old session, and which + // are from new session. Messages of old sesson will be dropped. + + private int _sessionId; + private long _targetTask; + private long _sourceTask; + private Object _message; + + public TaskMessage(int sessionId, long targetTask, long sourceTask, Object message) { + _sessionId = sessionId; + _targetTask = targetTask; + _sourceTask = sourceTask; + _message = message; + } + + public int sessionId() { + return _sessionId; + } + + public long targetTask() { + return _targetTask; + } + + public long sourceTask() { + return _sourceTask; + } + + public Object message() { + return _message; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/transport/netty/WrappedChannelBuffer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/gearpump/transport/netty/WrappedChannelBuffer.java b/core/src/main/java/org/apache/gearpump/transport/netty/WrappedChannelBuffer.java new file mode 100644 index 0000000..878ae09 --- /dev/null +++ b/core/src/main/java/org/apache/gearpump/transport/netty/WrappedChannelBuffer.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.transport.netty; + +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.DataInput; +import java.io.IOException; + +/** Wrap ChannelBuffer as a DataInput */ +public class WrappedChannelBuffer implements DataInput { + private ChannelBuffer channelBuffer; + + public WrappedChannelBuffer() { + } + + public WrappedChannelBuffer(ChannelBuffer channelBuffer) { + this.channelBuffer = channelBuffer; + } + + public void setChannelBuffer(ChannelBuffer channelBuffer) { + this.channelBuffer = channelBuffer; + } + + @Override + public void readFully(byte[] b) throws IOException { + channelBuffer.readBytes(b); + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + channelBuffer.readBytes(b, off, len); + } + + @Override + public int skipBytes(int n) throws IOException { + channelBuffer.skipBytes(n); + return n; + } + + @Override + public boolean readBoolean() throws IOException { + return channelBuffer.readByte() != 0; + } + + @Override + public byte readByte() throws IOException { + return channelBuffer.readByte(); + } + + @Override + public int readUnsignedByte() throws IOException { + return channelBuffer.readUnsignedByte(); + } + + @Override + public short readShort() throws IOException { + return channelBuffer.readShort(); + } + + @Override + public int readUnsignedShort() throws IOException { + return channelBuffer.readUnsignedShort(); + } + + @Override + public char readChar() throws IOException { + return channelBuffer.readChar(); + } + + @Override + public int readInt() throws IOException { + return channelBuffer.readInt(); + } + + @Override + public long readLong() throws IOException { + return channelBuffer.readLong(); + } + + @Override + public float readFloat() throws IOException { + return channelBuffer.readFloat(); + } + + @Override + public double readDouble() throws IOException { + return channelBuffer.readDouble(); + } + + @Override + public String readLine() throws IOException { + throw new UnsupportedOperationException("readLine is not supported"); + } + + @Override + public String readUTF() throws IOException { + throw new UnsupportedOperationException("readUTF is not supported"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/util/AkkaHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/gearpump/util/AkkaHelper.java b/core/src/main/java/org/apache/gearpump/util/AkkaHelper.java new file mode 100644 index 0000000..2b5a9e6 --- /dev/null +++ b/core/src/main/java/org/apache/gearpump/util/AkkaHelper.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; + +public class AkkaHelper { + + /** + * Helper util to access the private[akka] system.actorFor method + * + * This is used for performance optimization, we encode the session Id + * in the ActorRef path. Session Id is used to identity sender Task. + * + * @param system ActorSystem + * @param path Relative or absolute path of this Actor System. + * @return Full qualified ActorRef. + */ + @SuppressWarnings("deprecation") + public static ActorRef actorFor(ActorSystem system, String path) { + return system.actorFor(path); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/util/HadoopFSLogAppender.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/gearpump/util/HadoopFSLogAppender.java b/core/src/main/java/org/apache/gearpump/util/HadoopFSLogAppender.java new file mode 100644 index 0000000..bf1e618 --- /dev/null +++ b/core/src/main/java/org/apache/gearpump/util/HadoopFSLogAppender.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util; + +import org.apache.log4j.RollingFileAppender; + +/** + * Log4j appender for to write to user specified Hadoop filesystem. + */ +public class HadoopFSLogAppender extends RollingFileAppender { + //TODO: implement Log appender on Hadoop filesystem +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/java/org/apache/gearpump/util/RecreateRollingFileAppender.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/gearpump/util/RecreateRollingFileAppender.java b/core/src/main/java/org/apache/gearpump/util/RecreateRollingFileAppender.java new file mode 100644 index 0000000..594ccf9 --- /dev/null +++ b/core/src/main/java/org/apache/gearpump/util/RecreateRollingFileAppender.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.util; + +import org.apache.log4j.RollingFileAppender; +import org.apache.log4j.spi.LoggingEvent; + +import java.io.File; + +/** When log file is deleted, tried to recreate it in file system */ +public class RecreateRollingFileAppender extends RollingFileAppender { + + protected long checkFileInterval = 60L; + private long lastCheckTime = 0L; + + @Override + public void append(LoggingEvent event) { + checkInterval(); + super.append(event); + } + + private void checkInterval() { + long currentTime = System.currentTimeMillis(); + if ((currentTime - lastCheckTime) > (checkFileInterval * 1000)) { + checkLogFileExist(); + lastCheckTime = currentTime; + } + } + + private void checkLogFileExist() { + String fileName = super.fileName; + if (fileName != null) { + File logFile = new File(fileName); + if (!logFile.exists()) { + this.setFile(fileName); + this.activateOptions(); + } + } + } + + public long getCheckFileInterval() { + return this.checkFileInterval; + } + + public void setCheckFileInterval(long checkFileInterval) { + this.checkFileInterval = checkFileInterval; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/core/src/main/resources/geardefault.conf b/core/src/main/resources/geardefault.conf index 4eecf3a..19ffb5f 100644 --- a/core/src/main/resources/geardefault.conf +++ b/core/src/main/resources/geardefault.conf @@ -30,14 +30,14 @@ gearpump { ## The installation folder of gearpump home = "" - serializer.pool = "io.gearpump.serializer.FastKryoSerializerPool" + serializer.pool = "org.apache.gearpump.serializer.FastKryoSerializerPool" ## How many slots each worker contains worker.slots = 1000 ## The class responsable for launching the executor process. - ## User can switch to "io.gearpump.cluster.worker.CGroupProcessLauncher" to enable CGroup support. - worker.executor-process-launcher = "io.gearpump.cluster.worker.DefaultExecutorProcessLauncher" + ## User can switch to "org.apache.gearpump.cluster.worker.CGroupProcessLauncher" to enable CGroup support. + worker.executor-process-launcher = "org.apache.gearpump.cluster.worker.DefaultExecutorProcessLauncher" ## Number of executors to launch when starting an application application.executor-num = 1 @@ -57,7 +57,7 @@ gearpump { master-resource-manager-container-id = "" ## To enable worker use cgroup to make resource isolation, - ## set gearpump.worker.executor-process-launcher = "io.gearpump.cluster.worker.CGroupProcessLauncher" + ## set gearpump.worker.executor-process-launcher = "org.apache.gearpump.cluster.worker.CGroupProcessLauncher" ## ## Before enable it, you should also make sure: ## 1. Linux version (>= 2.6.18) @@ -176,7 +176,7 @@ gearpump { ### Whitelist for Metrics Aggregator class. ### See class [[MetricsAggregator]] for more information. metrics-aggregator-class { - ## Format io.gearpump.KeyFullClassName = "" + ## Format org.apache.gearpump.KeyFullClassName = "" } } } @@ -219,9 +219,9 @@ gearpump { ### Gearpump has built-in serialization framework using Kryo. ### User are allowed to use a different serialization framework, like Protobuf - ### See [io.gearpump.serializer.FastKryoSerializationFramework] to find how + ### See [org.apache.gearpump.serializer.FastKryoSerializationFramework] to find how ### a custom serialization framework can be defined. - serialization-framework = "io.gearpump.serializer.FastKryoSerializationFramework" + serialization-framework = "org.apache.gearpump.serializer.FastKryoSerializationFramework" ### Define where the submitted jar file will be stored at @@ -238,7 +238,7 @@ gearpump { ### If you don't know what is this about, don't change it ######################### scheduling { - scheduler-class = "io.gearpump.cluster.scheduler.PriorityScheduler" + scheduler-class = "org.apache.gearpump.cluster.scheduler.PriorityScheduler" } ############################################# @@ -396,15 +396,15 @@ gearpump-ui { ## authentication channel like OAuth2. ## ## User can replace this with a custom User-Password based authenticator, - ## which implements interface io.gearpump.security.Authenticator + ## which implements interface org.apache.gearpump.security.Authenticator ## - authenticator = "io.gearpump.security.ConfigFileBasedAuthenticator" + authenticator = "org.apache.gearpump.security.ConfigFileBasedAuthenticator" - ## Configuration options for authenticator io.gearpump.security.ConfigFileBasedAuthenticator + ## Configuration options for authenticator org.apache.gearpump.security.ConfigFileBasedAuthenticator config-file-based-authenticator = { ## Format: username = "password_hash_value" ## password_hash_value can be generated by running shell tool: - ## bin/gear io.gearpump.security.PasswordUtil -password <your raw password> + ## bin/gear org.apache.gearpump.security.PasswordUtil -password <your raw password> ## Admin users have super permission to do everything admins = { @@ -456,7 +456,7 @@ gearpump-ui { ## For steps to enable OAuth2 Authentication on Google, please view docs/deployment-ui-authentication.md ## "google" { - "class" = "io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator" + "class" = "org.apache.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator" ## Please replace "127.0.0.1:8090" with your address of UI service. "callback" = "http://127.0.0.1:8090/login/oauth2/google/callback" @@ -487,7 +487,7 @@ gearpump-ui { ## For steps to enable OAuth2 Authentication for UAA, please view docs/deployment-ui-authentication.md ## "cloudfoundryuaa" { - "class" = "io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator" + "class" = "org.apache.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator" ## Please replace "127.0.0.1:8090" with your address of UI service. "callback" = "http://127.0.0.1:8090/login/oauth2/cloudfoundryuaa/callback" @@ -525,7 +525,7 @@ gearpump-ui { ## Define how to do additional authorization check. The class should implement ## interface CloudFoundryUAAOAuth2Authenticator.AdditionalAuthenticator additional-authenticator = { - "class" = "io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator$OrganizationAccessChecker" + "class" = "org.apache.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator$OrganizationAccessChecker" ## Please fill the Cloud Foundry API endpoint and organization GUID "organization-url" = "http://<cloud foundry api endpoint>/v2/organizations/<organization-guid>" @@ -593,8 +593,8 @@ akka { daemonic = on extensions = [ - "io.gearpump.transport.Express$", - "io.gearpump.metrics.Metrics$" + "org.apache.gearpump.transport.Express$", + "org.apache.gearpump.metrics.Metrics$" ] loglevel = "INFO" loggers = ["akka.event.slf4j.Slf4jLogger"] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/core/src/main/resources/log4j.properties b/core/src/main/resources/log4j.properties index 682a11b..cbe0749 100644 --- a/core/src/main/resources/log4j.properties +++ b/core/src/main/resources/log4j.properties @@ -61,7 +61,7 @@ log4j.threshhold=ALL # Rolling File Appender # #log4j.appender.RollingFileAppender=org.apache.log4j.RollingFileAppender -log4j.appender.RollingFileAppender=io.gearpump.util.RecreateRollingFileAppender +log4j.appender.RollingFileAppender=org.apache.gearpump.util.RecreateRollingFileAppender log4j.appender.RollingFileAppender.File=${gearpump.log.dir}/${gearpump.log.file} log4j.appender.RollingFileAppender.checkFileInterval=60 log4j.appender.RollingFileAppender.layout=org.apache.log4j.PatternLayout @@ -86,7 +86,7 @@ log4j.appender.console.layout.ConversionPattern=[%p] [%d{MM/dd/yyyy HH:mm:ss.SSS # # Application Log Appender # -log4j.appender.ApplicationLogAppender=io.gearpump.util.RecreateRollingFileAppender +log4j.appender.ApplicationLogAppender=org.apache.gearpump.util.RecreateRollingFileAppender log4j.appender.ApplicationLogAppender.File=${gearpump.application.log.dir}/${gearpump.application.log.file} log4j.appender.ApplicationLogAppender.checkFileInterval=60 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/Message.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/Message.scala b/core/src/main/scala/io/gearpump/Message.scala deleted file mode 100644 index aec8089..0000000 --- a/core/src/main/scala/io/gearpump/Message.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump - -/** - * Each message contains a immutable timestamp. - * - * For example, if you take a picture, the time you take the picture is the - * message's timestamp. - * @param msg Accept any type except Null, Nothing and Unit - */ -case class Message(msg: Any, timestamp: TimeStamp = Message.noTimeStamp) - -object Message { - val noTimeStamp: TimeStamp = 0L -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/AppDescription.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/AppDescription.scala b/core/src/main/scala/io/gearpump/cluster/AppDescription.scala deleted file mode 100644 index 799c20a..0000000 --- a/core/src/main/scala/io/gearpump/cluster/AppDescription.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster - -import scala.reflect.ClassTag - -import akka.actor.{Actor, ActorRef, ActorSystem} -import com.typesafe.config.{Config, ConfigFactory} - -import io.gearpump.cluster.appmaster.WorkerInfo -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.jarstore.FilePath - -/** - * This contains all information to run an application - * - * @param name The name of this application - * @param appMaster The class name of AppMaster Actor - * @param userConfig user configuration. - * @param clusterConfig User provided cluster config, it overrides gear.conf when starting - * new applications. In most cases, you should not need to change it. If you do - * really need to change it, please use ClusterConfigSource(filePath) to - * construct the object, while filePath points to the .conf file. - */ -case class AppDescription( - name: String, appMaster: String, userConfig: UserConfig, - clusterConfig: Config = ConfigFactory.empty()) - -/** - * Each job, streaming or not streaming, need to provide an Application class. - * The master uses this class to start AppMaster. - */ -trait Application { - - /** Name of this application, must be unique in the system */ - def name: String - - /** Custom user configuration */ - def userConfig(implicit system: ActorSystem): UserConfig - - /** - * AppMaster class, must have a constructor like this: - * this(appContext: AppMasterContext, app: AppDescription) - */ - def appMaster: Class[_ <: ApplicationMaster] -} - -object Application { - def apply[T <: ApplicationMaster]( - name: String, userConfig: UserConfig)(implicit tag: ClassTag[T]): Application = { - new DefaultApplication(name, userConfig, - tag.runtimeClass.asInstanceOf[Class[_ <: ApplicationMaster]]) - } - - class DefaultApplication( - override val name: String, inputUserConfig: UserConfig, - val appMaster: Class[_ <: ApplicationMaster]) extends Application { - override def userConfig(implicit system: ActorSystem): UserConfig = inputUserConfig - } - - def ApplicationToAppDescription(app: Application)(implicit system: ActorSystem) - : AppDescription = { - val filterJvmReservedKeys = ClusterConfig.filterOutDefaultConfig(system.settings.config) - AppDescription(app.name, app.appMaster.getName, app.userConfig, filterJvmReservedKeys) - } -} - -/** - * Used for verification. All AppMaster must extend this interface - */ -abstract class ApplicationMaster extends Actor - -/** - * This contains context information when starting an AppMaster - * - * @param appId application instance id assigned, it is unique in the cluster - * @param username The username who submitted this application - * @param resource Resouce allocated to start this AppMaster daemon. AppMaster are allowed to - * request more resource from Master. - * @param appJar application Jar. If the jar is already in classpath, then it can be None. - * @param masterProxy The proxy to master actor, it bridges the messages between appmaster - * and master - * @param registerData AppMaster are required to send this data to Master by when doing - * RegisterAppMaster. - */ -case class AppMasterContext( - appId: Int, - username: String, - resource: Resource, - workerInfo: WorkerInfo, - appJar: Option[AppJar], - masterProxy: ActorRef, - registerData: AppMasterRegisterData) - -/** - * Jar file container in the cluster - * - * @param name A meaningful name to represent this jar - * @param filePath Where the jar file is stored. - */ -case class AppJar(name: String, filePath: FilePath) - -/** - * Serves as the context to start an Executor JVM. - */ -// TODO: ExecutorContext doesn't belong to this package in logic. -case class ExecutorContext( - executorId: Int, worker: WorkerInfo, appId: Int, appName: String, - appMaster: ActorRef, resource: Resource) - -/** - * JVM configurations to start an Executor JVM. - * - * @param classPath When executor is created by a worker JVM, executor automatically inherits - * parent worker's classpath. Sometimes, you still want to add some extra - * classpath, you can do this by specify classPath option. - * @param jvmArguments java arguments like -Dxx=yy - * @param mainClass Executor main class name like io.gearpump.xx.AppMaster - * @param arguments Executor command line arguments - * @param jar application jar - * @param executorAkkaConfig Akka config used to initialize the actor system of this executor. - * It uses io.gearpump.util.Constants.GEARPUMP_CUSTOM_CONFIG_FILE - * to pass the config to executor process - */ -// TODO: ExecutorContext doesn't belong to this package in logic. -case class ExecutorJVMConfig( - classPath: Array[String], jvmArguments: Array[String], mainClass: String, - arguments: Array[String], jar: Option[AppJar], username: String, - executorAkkaConfig: Config = ConfigFactory.empty()) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala b/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala deleted file mode 100644 index 5cc49e7..0000000 --- a/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster - -import java.io.File - -import com.typesafe.config._ - -import io.gearpump.util.Constants._ -import io.gearpump.util.{Constants, FileUtils, LogUtil, Util} - -/** - * - * All Gearpump application should use this class to load configurations. - * - * Compared with Akka built-in com.typesafe.config.ConfigFactory, this class also - * resolve config from file gear.conf and geardefault.conf. - * - * Overriding order: - * {{{ - * System Properties - * > Custom configuration file (by using system property -Dgearpump.config.file) > - * > gear.conf - * > geardefault.conf - * > reference.conf - * }}} - */ - -object ClusterConfig { - /** - * alias for default - * default is a reserved word for java - * @return - */ - def defaultConfig: Config = { - default(APPLICATION) - } - - /** - * default application for user. - * Usually used when user want to start an client application. - * @return - */ - def default(configFile: String = APPLICATION): Config = { - load(configFile).default - } - - /** - * configuration for master node - * @return - */ - def master(configFile: String = null): Config = { - load(configFile).master - } - - /* - * configuration for worker node - */ - def worker(configFile: String = null): Config = { - load(configFile).worker - } - - /** - * configuration for UI server - * @return - */ - def ui(configFile: String = null): Config = { - load(configFile).ui - } - - /** - * try to load system property gearpump.config.file, or use configFile - */ - private def load(configFile: String): Configs = { - val file = Option(System.getProperty(GEARPUMP_CUSTOM_CONFIG_FILE)) - file match { - case Some(path) => - LOG.info("loading config file " + path + "..........") - load(ClusterConfigSource(path)) - case None => - LOG.info("loading config file application.conf...") - load(ClusterConfigSource(configFile)) - } - } - - val APPLICATION = "application.conf" - val LOG = LogUtil.getLogger(getClass) - - def saveConfig(conf: Config, file: File): Unit = { - val serialized = conf.root().render() - FileUtils.write(file, serialized) - } - - def render(config: Config, concise: Boolean = false): String = { - if (concise) { - config.root().render(ConfigRenderOptions.concise().setFormatted(true)) - } else { - config.root().render(ConfigRenderOptions.defaults()) - } - } - - /** filter JVM reserved keys and akka default reference.conf */ - def filterOutDefaultConfig(input: Config): Config = { - val updated = filterOutJvmReservedKeys(input) - Util.filterOutOrigin(updated, "reference.conf") - } - - private[gearpump] def load(source: ClusterConfigSource): Configs = { - - val systemProperties = getSystemProperties - - val user = source.getConfig - - val gear = ConfigFactory.parseResourcesAnySyntax("gear.conf", - ConfigParseOptions.defaults.setAllowMissing(true)) - - val gearDefault = ConfigFactory.parseResourcesAnySyntax("geardefault.conf", - ConfigParseOptions.defaults.setAllowMissing(true)) - - val all = systemProperties.withFallback(user).withFallback(gear).withFallback(gearDefault) - - val linux = all.getConfig(LINUX_CONFIG) - - var basic = all.withoutPath(MASTER_CONFIG).withoutPath(WORKER_CONFIG). - withoutPath(UI_CONFIG).withoutPath(LINUX_CONFIG) - - if (!akka.util.Helpers.isWindows) { - - // Change the akka.scheduler.tick-duration to 1 ms for Linux or Mac - basic = linux.withFallback(basic) - } - - val master = replaceHost(all.getConfig(MASTER_CONFIG).withFallback(basic)) - val worker = replaceHost(all.getConfig(WORKER_CONFIG).withFallback(basic)) - val ui = replaceHost(all.getConfig(UI_CONFIG).withFallback(basic)) - val app = replaceHost(basic) - - new Configs(master, worker, ui, app) - } - - private def replaceHost(config: Config): Config = { - val hostName = config.getString(Constants.GEARPUMP_HOSTNAME) - config.withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(hostName)) - } - - val JVM_RESERVED_PROPERTIES = List( - "os", "java", "sun", "boot", "user", "prog", "path", "line", "awt", "file" - ) - - private def getSystemProperties: Config = { - // Excludes default java system properties - JVM_RESERVED_PROPERTIES.foldLeft(ConfigFactory.systemProperties()) { (config, property) => - config.withoutPath(property) - } - } - - class ConfigValidationException(msg: String) extends Exception(msg: String) - - private def filterOutJvmReservedKeys(input: Config): Config = { - val filterJvmReservedKeys = JVM_RESERVED_PROPERTIES.foldLeft(input) { (config, key) => - config.withoutPath(key) - } - filterJvmReservedKeys - } - - protected class Configs( - val master: Config, val worker: Config, val ui: Config, val default: Config) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala b/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala deleted file mode 100644 index 3a248d7..0000000 --- a/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster - -import java.io.File -import scala.language.implicitConversions - -import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} - -/** - * Data Source of ClusterConfig - * - * Please use ClusterConfigSource.apply(filePath) to construct this object - */ -sealed trait ClusterConfigSource extends Serializable { - def getConfig: Config -} - -object ClusterConfigSource { - - /** - * Construct ClusterConfigSource from resource name or file path - */ - def apply(filePath: String): ClusterConfigSource = { - - if (null == filePath) { - new ClusterConfigSourceImpl(ConfigFactory.empty()) - } else { - var config = ConfigFactory.parseFileAnySyntax(new File(filePath), - ConfigParseOptions.defaults.setAllowMissing(true)) - - if (null == config || config.isEmpty) { - config = ConfigFactory.parseResourcesAnySyntax(filePath, - ConfigParseOptions.defaults.setAllowMissing(true)) - } - new ClusterConfigSourceImpl(config) - } - } - - implicit def FilePathToClusterConfigSource(filePath: String): ClusterConfigSource = { - apply(filePath) - } - - private class ClusterConfigSourceImpl(config: Config) extends ClusterConfigSource { - override def getConfig: Config = config - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala deleted file mode 100644 index ed187a1..0000000 --- a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster - -import scala.util.Try - -import akka.actor.ActorRef -import com.typesafe.config.Config - -import io.gearpump.TimeStamp -import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus -import io.gearpump.cluster.master.{MasterNode, MasterSummary} -import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} -import io.gearpump.cluster.worker.{WorkerId, WorkerSummary} -import io.gearpump.metrics.Metrics.MetricType - -object ClientToMaster { - case object AddMaster - case class AddWorker(count: Int) - case class RemoveMaster(masterContainerId: String) - case class RemoveWorker(workerContainerId: String) - - /** Command result of AddMaster, RemoveMaster, and etc... */ - case class CommandResult(success: Boolean, exception: String = null) { - override def toString: String = { - val tag = getClass.getSimpleName - if (success) { - s"$tag(success)" - } else { - s"$tag(failure, $exception)" - } - } - } - - /** Submit an application to master */ - case class SubmitApplication( - appDescription: AppDescription, appJar: Option[AppJar], - username: String = System.getProperty("user.name")) - - case class RestartApplication(appId: Int) - case class ShutdownApplication(appId: Int) - - /** Client send ResolveAppId to Master to resolves AppMaster actor path by providing appId */ - case class ResolveAppId(appId: Int) - - /** Client send ResolveWorkerId to master to get the Actor path of worker. */ - case class ResolveWorkerId(workerId: WorkerId) - - /** Get an active Jar store to upload job jars, like wordcount.jar */ - case object GetJarStoreServer - - /** Service address of JarStore */ - case class JarStoreServerAddress(url: String) - - /** Query AppMaster config by providing appId */ - case class QueryAppMasterConfig(appId: Int) - - /** Query worker config */ - case class QueryWorkerConfig(workerId: WorkerId) - - /** Query master config */ - case object QueryMasterConfig - - /** Options for read the metrics from the cluster */ - object ReadOption { - type ReadOption = String - - val Key: String = "readOption" - - /** Read the latest record of the metrics, only return 1 record for one metric name (id) */ - val ReadLatest: ReadOption = "readLatest" - - /** Read recent metrics from cluster, typically it contains metrics in 5 minutes */ - val ReadRecent = "readRecent" - - /** - * Read the history metrics, typically it contains metrics for 48 hours - * - * NOTE: Each hour only contain one or two data points. - */ - val ReadHistory = "readHistory" - } - - /** Query history metrics from master or app master. */ - case class QueryHistoryMetrics( - path: String, readOption: ReadOption.ReadOption = ReadOption.ReadLatest, - aggregatorClazz: String = "", options: Map[String, String] = Map.empty[String, String]) - - /** - * If there are message loss, the clock would pause for a while. This message is used to - * pin-point which task has stalling clock value, and usually it means something wrong on - * that machine. - */ - case class GetStallingTasks(appId: Int) - - /** - * Request app master for a short list of cluster app that administrators should be aware of. - */ - case class GetLastFailure(appId: Int) -} - -object MasterToClient { - - /** Result of SubmitApplication */ - // TODO: Merge with SubmitApplicationResultValue and change this to (appId: Option, ex: Exception) - case class SubmitApplicationResult(appId: Try[Int]) - - case class SubmitApplicationResultValue(appId: Int) - - case class ShutdownApplicationResult(appId: Try[Int]) - case class ReplayApplicationResult(appId: Try[Int]) - - /** Return Actor ref of app master */ - case class ResolveAppIdResult(appMaster: Try[ActorRef]) - - /** Return Actor ref of worker */ - case class ResolveWorkerIdResult(worker: Try[ActorRef]) - - case class AppMasterConfig(config: Config) - - case class WorkerConfig(config: Config) - - case class MasterConfig(config: Config) - - case class HistoryMetricsItem(time: TimeStamp, value: MetricType) - - /** - * History metrics returned from master, worker, or app master. - * - * All metric items are organized like a tree, path is used to navigate through the tree. - * For example, when querying with path == "executor0.task1.throughput*", the metrics - * provider picks metrics whose source matches the path. - * - * @param path The path client provided. The returned metrics are the result query of this path. - * @param metrics The detailed metrics. - */ - case class HistoryMetrics(path: String, metrics: List[HistoryMetricsItem]) - - /** Return the last error of this streaming application job */ - case class LastFailure(time: TimeStamp, error: String) -} - -trait AppMasterRegisterData - -object AppMasterToMaster { - - /** - * Register an AppMaster by providing a ActorRef, and registerData - * @param registerData The registerData is provided by Master when starting the app master. - * App master should return the registerData back to master. - * Typically registerData hold some context information for this app Master. - */ - - case class RegisterAppMaster(appMaster: ActorRef, registerData: AppMasterRegisterData) - - case class InvalidAppMaster(appId: Int, appMaster: String, reason: Throwable) - - case class RequestResource(appId: Int, request: ResourceRequest) - - /** - * Each application job can save some data in the distributed cluster storage on master nodes. - * - * @param appId App Id of the client application who send the request. - * @param key Key name - * @param value Value to store on distributed cluster storage on master nodes - */ - case class SaveAppData(appId: Int, key: String, value: Any) - - /** The application specific data is successfully stored */ - case object AppDataSaved - - /** Fail to store the application data */ - case object SaveAppDataFailed - - /** Fetch the application specific data that stored previously */ - case class GetAppData(appId: Int, key: String) - - /** The KV data returned for query GetAppData */ - case class GetAppDataResult(key: String, value: Any) - - /** - * AppMasterSummary returned to REST API query. Streaming and Non-streaming - * have very different application info. AppMasterSummary is the common interface. - */ - trait AppMasterSummary { - def appType: String - def appId: Int - def appName: String - def actorPath: String - def status: AppMasterStatus - def startTime: TimeStamp - def uptime: TimeStamp - def user: String - } - - /** Represents a generic application that is not a streaming job */ - case class GeneralAppMasterSummary( - appId: Int, - appType: String = "general", - appName: String = null, - actorPath: String = null, - status: AppMasterStatus = MasterToAppMaster.AppMasterActive, - startTime: TimeStamp = 0L, - uptime: TimeStamp = 0L, - user: String = null) - extends AppMasterSummary - - /** Fetches the list of workers from Master */ - case object GetAllWorkers - - /** Get worker data of workerId */ - case class GetWorkerData(workerId: WorkerId) - - /** Response to GetWorkerData */ - case class WorkerData(workerDescription: WorkerSummary) - - /** Get Master data */ - case object GetMasterData - - /** Response to GetMasterData */ - case class MasterData(masterDescription: MasterSummary) -} - -object MasterToAppMaster { - - /** Resource allocated for application xx */ - case class ResourceAllocated(allocations: Array[ResourceAllocation]) - - /** Master confirm reception of RegisterAppMaster message */ - case class AppMasterRegistered(appId: Int) - - /** Shutdown the application job */ - case object ShutdownAppMaster - - type AppMasterStatus = String - val AppMasterActive: AppMasterStatus = "active" - val AppMasterInActive: AppMasterStatus = "inactive" - val AppMasterNonExist: AppMasterStatus = "nonexist" - - sealed trait StreamingType - case class AppMasterData( - status: AppMasterStatus, appId: Int = 0, appName: String = null, appMasterPath: String = null, - workerPath: String = null, submissionTime: TimeStamp = 0, startTime: TimeStamp = 0, - finishTime: TimeStamp = 0, user: String = null) - - case class AppMasterDataRequest(appId: Int, detail: Boolean = false) - - case class AppMastersData(appMasters: List[AppMasterData]) - case object AppMastersDataRequest - case class AppMasterDataDetailRequest(appId: Int) - case class AppMasterMetricsRequest(appId: Int) extends StreamingType - - case class ReplayFromTimestampWindowTrailingEdge(appId: Int) - - case class WorkerList(workers: List[WorkerId]) -} - -object AppMasterToWorker { - case class LaunchExecutor( - appId: Int, executorId: Int, resource: Resource, executorJvmConfig: ExecutorJVMConfig) - - case class ShutdownExecutor(appId: Int, executorId: Int, reason: String) - case class ChangeExecutorResource(appId: Int, executorId: Int, resource: Resource) -} - -object WorkerToAppMaster { - case class ExecutorLaunchRejected(reason: String = null, ex: Throwable = null) - case class ShutdownExecutorSucceed(appId: Int, executorId: Int) - case class ShutdownExecutorFailed(reason: String = null, ex: Throwable = null) -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/UserConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/UserConfig.scala b/core/src/main/scala/io/gearpump/cluster/UserConfig.scala deleted file mode 100644 index 61de1dd..0000000 --- a/core/src/main/scala/io/gearpump/cluster/UserConfig.scala +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster - -import akka.actor.{ActorSystem, ExtendedActorSystem} -import akka.serialization.JavaSerializer - -import io.gearpump.google.common.io.BaseEncoding - -/** - * Immutable configuration - */ -final class UserConfig(private val _config: Map[String, String]) extends Serializable { - - def withBoolean(key: String, value: Boolean): UserConfig = { - new UserConfig(_config + (key -> value.toString)) - } - - def withDouble(key: String, value: Double): UserConfig = { - new UserConfig(_config + (key -> value.toString)) - } - - def withFloat(key: String, value: Float): UserConfig = { - new UserConfig(_config + (key -> value.toString)) - } - - def withInt(key: String, value: Int): UserConfig = { - new UserConfig(_config + (key -> value.toString)) - } - - def withLong(key: String, value: Long): UserConfig = { - new UserConfig(_config + (key -> value.toString)) - } - - def withString(key: String, value: String): UserConfig = { - if (null == value) { - this - } else { - new UserConfig(_config + (key -> value)) - } - } - - def without(key: String): UserConfig = { - val config = _config - key - new UserConfig(config) - } - - def filter(p: ((String, String)) => Boolean): UserConfig = { - val updated = _config.filter(p) - new UserConfig(updated) - } - - def getBoolean(key: String): Option[Boolean] = { - _config.get(key).map(_.toBoolean) - } - - def getDouble(key: String): Option[Double] = { - _config.get(key).map(_.toDouble) - } - - def getFloat(key: String): Option[Float] = { - _config.get(key).map(_.toFloat) - } - - def getInt(key: String): Option[Int] = { - _config.get(key).map(_.toInt) - } - - def getLong(key: String): Option[Long] = { - _config.get(key).map(_.toLong) - } - - def getString(key: String): Option[String] = { - _config.get(key) - } - - def getBytes(key: String): Option[Array[Byte]] = { - _config.get(key).map(BaseEncoding.base64().decode(_)) - } - - def withBytes(key: String, value: Array[Byte]): UserConfig = { - if (null == value) { - this - } else { - this.withString(key, BaseEncoding.base64().encode(value)) - } - } - - // scalastyle:off line.size.limit - /** - * This de-serializes value to object instance - * - * To do de-serialization, this requires an implicit ActorSystem, as - * the ActorRef and possibly other akka classes deserialization - * requires an implicit ActorSystem. - * - * See Link: - * http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization - */ - - def getValue[T](key: String)(implicit system: ActorSystem): Option[T] = { - val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem]) - _config.get(key).map(BaseEncoding.base64().decode(_)) - .map(serializer.fromBinary(_).asInstanceOf[T]) - } - - /** - * This serializes the object and store it as string. - * - * To do serialization, this requires an implicit ActorSystem, as - * the ActorRef and possibly other akka classes serialization - * requires an implicit ActorSystem. - * - * See Link: - * http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization - */ - def withValue[T <: AnyRef](key: String, value: T)(implicit system: ActorSystem): UserConfig = { - - if (null == value) { - this - } else { - val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem]) - val bytes = serializer.toBinary(value) - val encoded = BaseEncoding.base64().encode(bytes) - this.withString(key, encoded) - } - } - // scalastyle:on line.size.limit - - def withConfig(other: UserConfig): UserConfig = { - if (null == other) { - this - } else { - new UserConfig(_config ++ other._config) - } - } -} - -object UserConfig { - - def empty: UserConfig = new UserConfig(Map.empty[String, String]) - - def apply(config: Map[String, String]): UserConfig = new UserConfig(config) - - def unapply(config: UserConfig): Option[Map[String, String]] = Option(config._config) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala deleted file mode 100644 index 4e8582b..0000000 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.appmaster - -import akka.actor._ - -import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster -import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._ -import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{Session, StartExecutorSystems} -import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus._ -import io.gearpump.cluster.master.MasterProxy -import io.gearpump.cluster.{AppDescription, AppMasterContext} -import io.gearpump.util.LogUtil - -/** - * This serves as runtime environment for AppMaster. - * When starting an AppMaster, we need to setup the connection to master, - * and prepare other environments. - * - * This also extend the function of Master, by providing a scheduler service for Executor System. - * AppMaster can ask Master for executor system directly. details like requesting resource, - * contacting worker to start a process, and then starting an executor system is hidden from - * AppMaster. - * - * Please use AppMasterRuntimeEnvironment.props() to construct this actor. - */ -private[appmaster] -class AppMasterRuntimeEnvironment( - appContextInput: AppMasterContext, - app: AppDescription, - masters: Iterable[ActorPath], - masterFactory: (AppId, MasterActorRef) => Props, - appMasterFactory: (AppMasterContext, AppDescription) => Props, - masterConnectionKeeperFactory: (MasterActorRef, RegisterAppMaster, ListenerActorRef) => Props) - extends Actor { - - val appId = appContextInput.appId - private val LOG = LogUtil.getLogger(getClass, app = appId) - - import scala.concurrent.duration._ - - private val master = context.actorOf( - masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 30.seconds))))) - private val appContext = appContextInput.copy(masterProxy = master) - - // Create appMaster proxy to receive command and forward to appmaster - private val appMaster = context.actorOf(appMasterFactory(appContext, app)) - context.watch(appMaster) - - private val registerAppMaster = RegisterAppMaster(appMaster, appContext.registerData) - private val masterConnectionKeeper = context.actorOf( - masterConnectionKeeperFactory(master, registerAppMaster, self)) - context.watch(masterConnectionKeeper) - - def receive: Receive = { - case MasterConnected => - LOG.info(s"Master is connected, start AppMaster ${appId}...") - appMaster ! StartAppMaster - case MasterStopped => - LOG.error(s"Master is stopped, stop AppMaster ${appId}...") - context.stop(self) - case Terminated(actor) => actor match { - case `appMaster` => - LOG.error(s"AppMaster ${appId} is stopped, shutdown myself") - context.stop(self) - case `masterConnectionKeeper` => - LOG.error(s"Master connection keeper is stopped, appId: ${appId}, shutdown myself") - context.stop(self) - case _ => // Skip - } - } -} - -object AppMasterRuntimeEnvironment { - - def props( - masters: Iterable[ActorPath], app: AppDescription, appContextInput: AppMasterContext) - : Props = { - - val master = (appId: AppId, masterProxy: MasterActorRef) => - MasterWithExecutorSystemProvider.props(appId, masterProxy) - - val appMaster = (appContext: AppMasterContext, app: AppDescription) => - LazyStartAppMaster.props(appContext, app) - - val masterConnectionKeeper = (master: MasterActorRef, registerAppMaster: - RegisterAppMaster, listener: ListenerActorRef) => Props(new MasterConnectionKeeper( - registerAppMaster, master, masterStatusListener = listener)) - - Props(new AppMasterRuntimeEnvironment( - appContextInput, app, masters, master, appMaster, masterConnectionKeeper)) - } - - /** - * This behavior like a AppMaster. Under the hood, It start start the real AppMaster in a lazy - * way. When real AppMaster is not started yet, all messages are stashed. The stashed - * messages are forwarded to real AppMaster when the real AppMaster is started. - * - * Please use LazyStartAppMaster.props to construct this actor - * - * @param appMasterProps underlying AppMaster Props - */ - private[appmaster] - class LazyStartAppMaster(appId: Int, appMasterProps: Props) extends Actor with Stash { - - private val LOG = LogUtil.getLogger(getClass, app = appId) - - def receive: Receive = null - - context.become(startAppMaster) - - def startAppMaster: Receive = { - case StartAppMaster => - val appMaster = context.actorOf(appMasterProps, "appmaster") - context.watch(appMaster) - context.become(terminationWatch(appMaster) orElse appMasterService(appMaster)) - unstashAll() - case _ => - stash() - } - - def terminationWatch(appMaster: ActorRef): Receive = { - case Terminated(appMaster) => - LOG.error("appmaster is stopped") - context.stop(self) - } - - def appMasterService(appMaster: ActorRef): Receive = { - case msg => appMaster forward msg - } - } - - private[appmaster] - object LazyStartAppMaster { - def props(appContext: AppMasterContext, app: AppDescription): Props = { - val appMasterProps = Props(Class.forName(app.appMaster), appContext, app) - Props(new LazyStartAppMaster(appContext.appId, appMasterProps)) - } - } - - private[appmaster] case object StartAppMaster - - /** - * This enhance Master by providing new service: StartExecutorSystems - * - * Please use MasterWithExecutorSystemProvider.props to construct this actor - * - */ - private[appmaster] - class MasterWithExecutorSystemProvider(master: ActorRef, executorSystemProviderProps: Props) - extends Actor { - - val executorSystemProvider: ActorRef = context.actorOf(executorSystemProviderProps) - - override def receive: Receive = { - case request: StartExecutorSystems => - executorSystemProvider forward request - case msg => - master forward msg - } - } - - private[appmaster] - object MasterWithExecutorSystemProvider { - def props(appId: Int, master: ActorRef): Props = { - - val executorSystemLauncher = (appId: Int, session: Session) => - Props(new ExecutorSystemLauncher(appId, session)) - - val scheduler = Props(new ExecutorSystemScheduler(appId, master, executorSystemLauncher)) - - Props(new MasterWithExecutorSystemProvider(master, scheduler)) - } - } - - private[appmaster] type AppId = Int - private[appmaster] type MasterActorRef = ActorRef - private[appmaster] type ListenerActorRef = ActorRef -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala deleted file mode 100644 index 5b3e0c5..0000000 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.appmaster - -import akka.actor.ActorRef -import com.typesafe.config.Config - -import io.gearpump._ -import io.gearpump.cluster.AppMasterRegisterData - -/** Run time info used to start an AppMaster */ -case class AppMasterRuntimeInfo( - appId: Int, - // AppName is the unique Id for an application - appName: String, - worker: ActorRef = null, - user: String = null, - submissionTime: TimeStamp = 0, - startTime: TimeStamp = 0, - finishTime: TimeStamp = 0, - config: Config = null) - extends AppMasterRegisterData
