Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java?rev=1183352&view=auto ============================================================================== --- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java (added) +++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java Fri Oct 14 13:29:10 2011 @@ -0,0 +1,488 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hama.Constants; +import org.apache.hama.bsp.sync.SyncServer; +import org.apache.hama.bsp.sync.SyncServerImpl; +import org.apache.hama.checkpoint.CheckpointRunner; + +/** + * This class represents a BSP peer. + */ +public class YARNBSPPeerImpl implements BSPPeer { + + public static final Log LOG = LogFactory.getLog(YARNBSPPeerImpl.class); + + private final Configuration conf; + + private volatile Server server = null; + + private final Map<InetSocketAddress, BSPPeer> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeer>(); + private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>(); + private ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>(); + private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>(); + private final Map<String, InetSocketAddress> peerSocketCache = new ConcurrentHashMap<String, InetSocketAddress>(); + + private InetSocketAddress peerAddress; + private TaskStatus currentTaskStatus; + + private TaskAttemptID taskid; + private SyncServer syncService; + private final BSPMessageSerializer messageSerializer; + + public static final class BSPSerializableMessage implements Writable { + final AtomicReference<String> path = new AtomicReference<String>(); + final AtomicReference<BSPMessageBundle> bundle = new AtomicReference<BSPMessageBundle>(); + + public BSPSerializableMessage() { + } + + public BSPSerializableMessage(final String path, + final BSPMessageBundle bundle) { + if (null == path) + throw new NullPointerException("No path provided for checkpointing."); + if (null == bundle) + throw new NullPointerException("No data provided for checkpointing."); + this.path.set(path); + this.bundle.set(bundle); + } + + public final String checkpointedPath() { + return this.path.get(); + } + + public final BSPMessageBundle messageBundle() { + return this.bundle.get(); + } + + @Override + public final void write(DataOutput out) throws IOException { + out.writeUTF(this.path.get()); + this.bundle.get().write(out); + } + + @Override + public final void readFields(DataInput in) throws IOException { + this.path.set(in.readUTF()); + BSPMessageBundle pack = new BSPMessageBundle(); + pack.readFields(in); + this.bundle.set(pack); + } + + }// serializable message + + final class BSPMessageSerializer { + final Socket client; + final ScheduledExecutorService sched; + + public BSPMessageSerializer(final int port) { + Socket tmp = null; + int cnt = 0; + do { + tmp = init(port); + cnt++; + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + LOG.warn("Thread is interrupted.", ie); + Thread.currentThread().interrupt(); + } + } while (null == tmp && 10 > cnt); + this.client = tmp; + if (null == this.client) + throw new NullPointerException("Client socket is null."); + this.sched = Executors.newScheduledThreadPool(conf.getInt( + "bsp.checkpoint.serializer_thread", 10)); + LOG.info(BSPMessageSerializer.class.getName() + + " is ready to serialize message."); + } + + private Socket init(final int port) { + Socket tmp = null; + try { + tmp = new Socket("localhost", port); + } catch (UnknownHostException uhe) { + LOG.error("Unable to connect to BSPMessageDeserializer.", uhe); + } catch (IOException ioe) { + LOG.warn("Fail to create socket.", ioe); + } + return tmp; + } + + void serialize(final BSPSerializableMessage tmp) throws IOException { + if (LOG.isDebugEnabled()) + LOG.debug("Messages are saved to " + tmp.checkpointedPath()); + final DataOutput out = new DataOutputStream(client.getOutputStream()); + this.sched.schedule(new Callable<Object>() { + public Object call() throws Exception { + tmp.write(out); + return null; + } + }, 0, SECONDS); + } + + public void close() { + try { + this.client.close(); + this.sched.shutdown(); + } catch (IOException io) { + LOG.error("Fail to close client socket.", io); + } + } + + }// message serializer + + /** + * BSPPeer Constructor. + * + * BSPPeer acts on behalf of clients performing bsp() tasks. + * + * @param conf is the configuration file containing bsp peer host, port, etc. + * @param taskid is the id that current process holds. + */ + public YARNBSPPeerImpl(Configuration conf, TaskAttemptID taskid) + throws IOException { + this.conf = conf; + this.taskid = taskid; + + String bindAddress = conf.get(Constants.PEER_HOST, + Constants.DEFAULT_PEER_HOST); + int bindPort = conf + .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT); + peerAddress = new InetSocketAddress(bindAddress, bindPort); + BSPMessageSerializer msgSerializer = null; + if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) { + msgSerializer = new BSPMessageSerializer(conf.getInt( + "bsp.checkpoint.port", + Integer.parseInt(CheckpointRunner.DEFAULT_PORT))); + } + this.messageSerializer = msgSerializer; + + syncService = SyncServerImpl.getService(conf); + syncService.register(taskid, new Text(peerAddress.getHostName()), + new LongWritable(peerAddress.getPort())); + currentTaskStatus = new TaskStatus(); + } + + public void reinitialize() { + try { + if (LOG.isDebugEnabled()) + LOG.debug("reinitialize(): " + getPeerName()); + this.server = RPC.getServer(this, peerAddress.getHostName(), + peerAddress.getPort(), conf); + server.start(); + LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:" + + peerAddress.getPort()); + syncService = SyncServerImpl.getService(conf); + syncService.register(taskid, new Text(peerAddress.getHostName()), + new LongWritable(peerAddress.getPort())); + } catch (IOException e) { + LOG.error("Fail to start RPC server!", e); + } + } + + @Override + public BSPMessage getCurrentMessage() throws IOException { + return localQueue.poll(); + } + + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.BSPPeerInterface#send(java.net.InetSocketAddress, + * org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable) + */ + @Override + public void send(String peerName, BSPMessage msg) throws IOException { + if (peerName.equals(getPeerName())) { + LOG.debug("Local send bytes (" + msg.getData().toString() + ")"); + localQueueForNextIteration.add(msg); + } else { + LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName); + InetSocketAddress targetPeerAddress = null; + // Get socket for target peer. + if (peerSocketCache.containsKey(peerName)) { + targetPeerAddress = peerSocketCache.get(peerName); + } else { + targetPeerAddress = getAddress(peerName); + peerSocketCache.put(peerName, targetPeerAddress); + } + ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues + .get(targetPeerAddress); + if (queue == null) { + queue = new ConcurrentLinkedQueue<BSPMessage>(); + } + queue.add(msg); + outgoingQueues.put(targetPeerAddress, queue); + } + } + + // TODO not working properly! + private String checkpointedPath() { + String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/"); + // String ckptPath = backup + jobConf.getJobID().toString() + "/" + // + getSuperstepCount() + "/" + this.taskid.toString(); + // if (LOG.isDebugEnabled()) + // LOG.debug("Messages are to be saved to " + ckptPath); + return backup; + } + + /* + * (non-Javadoc) + * @see org.apache.hama.bsp.BSPPeerInterface#sync() + */ + @Override + public void sync() throws IOException, InterruptedException { + enterBarrier(); + Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = this.outgoingQueues + .entrySet().iterator(); + + while (it.hasNext()) { + Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it + .next(); + + BSPPeer peer = peers.get(entry.getKey()); + if (peer == null) { + try { + peer = getBSPPeerConnection(entry.getKey()); + } catch (NullPointerException ne) { + LOG.error(taskid + ": " + entry.getKey().getHostName() + + " doesn't exists."); + } + } + Iterable<BSPMessage> messages = entry.getValue(); + BSPMessageBundle bundle = new BSPMessageBundle(); + for (BSPMessage message : messages) { + bundle.addMessage(message); + } + + // checkpointing + if (null != this.messageSerializer) { + this.messageSerializer.serialize(new BSPSerializableMessage( + checkpointedPath(), bundle)); + } + + peer.put(bundle); + } + + leaveBarrier(); + currentTaskStatus.incrementSuperstepCount(); + + // Clear outgoing queues. + clearOutgoingQueues(); + + // Add non-processed messages from this iteration for the next's queue. + while (!localQueue.isEmpty()) { + BSPMessage message = localQueue.poll(); + localQueueForNextIteration.add(message); + } + // Switch local queues. + localQueue = localQueueForNextIteration; + localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>(); + } + + protected boolean enterBarrier() throws InterruptedException { + if (LOG.isDebugEnabled()) { + LOG.debug("[" + getPeerName() + "] enter the enterbarrier: " + + this.getSuperstepCount()); + } + + syncService.enterBarrier(taskid); + return true; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + protected boolean leaveBarrier() throws InterruptedException { + syncService.leaveBarrier(taskid); + return true; + } + + public void clear() { + this.localQueue.clear(); + this.outgoingQueues.clear(); + } + + @Override + public void close() throws IOException { + this.clear(); + syncService.deregisterFromBarrier(taskid, + new Text(this.peerAddress.getHostName()), new LongWritable( + this.peerAddress.getPort())); + if (server != null) + server.stop(); + if (null != messageSerializer) + this.messageSerializer.close(); + } + + @Override + public void put(BSPMessage msg) throws IOException { + this.localQueueForNextIteration.add(msg); + } + + @Override + public void put(BSPMessageBundle messages) throws IOException { + for (BSPMessage message : messages.getMessages()) { + this.localQueueForNextIteration.add(message); + } + } + + @Override + public long getProtocolVersion(String arg0, long arg1) throws IOException { + return BSPPeer.versionID; + } + + protected BSPPeer getBSPPeerConnection(InetSocketAddress addr) + throws NullPointerException { + BSPPeer peer; + synchronized (this.peers) { + peer = peers.get(addr); + + if (peer == null) { + try { + peer = (BSPPeer) RPC.getProxy(BSPPeer.class, BSPPeer.versionID, addr, + this.conf); + } catch (IOException e) { + LOG.error(e); + } + this.peers.put(addr, peer); + } + } + + return peer; + } + + /** + * @return the string as host:port of this Peer + */ + public String getPeerName() { + return peerAddress.getHostName() + ":" + peerAddress.getPort(); + } + + private InetSocketAddress getAddress(String peerName) { + String[] peerAddrParts = peerName.split(":"); + if (peerAddrParts.length != 2) { + throw new ArrayIndexOutOfBoundsException( + "Peername must consist of exactly ONE \":\"! Given peername was: " + + peerName); + } + return new InetSocketAddress(peerAddrParts[0], + Integer.parseInt(peerAddrParts[1])); + } + + @Override + public String[] getAllPeerNames() { + return syncService.getAllPeerNames().get(); + } + + /** + * @return the number of messages + */ + public int getNumCurrentMessages() { + return localQueue.size(); + } + + /** + * Sets the current status + * + * @param currentTaskStatus + */ + public void setCurrentTaskStatus(TaskStatus currentTaskStatus) { + this.currentTaskStatus = currentTaskStatus; + } + + /** + * @return the count of current super-step + */ + public long getSuperstepCount() { + return currentTaskStatus.getSuperstepCount(); + } + + /** + * @return the size of local queue + */ + public int getLocalQueueSize() { + return localQueue.size(); + } + + /** + * @return the sync service + */ + public SyncServer getSyncService() { + return syncService; + } + + /** + * @return the size of outgoing queue + */ + public int getOutgoingQueueSize() { + return outgoingQueues.size(); + } + + /** + * Clears local queue + */ + public void clearLocalQueue() { + this.localQueue.clear(); + } + + /** + * Clears outgoing queues + */ + public void clearOutgoingQueues() { + this.outgoingQueues.clear(); + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + // TODO Auto-generated method stub + return new ProtocolSignature(); + } +}
Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java?rev=1183352&view=auto ============================================================================== --- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (added) +++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java Fri Oct 14 13:29:10 2011 @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hama.HamaConfiguration; +import org.apache.zookeeper.KeeperException; + +public class YarnSerializePrinting { + + public static class HelloBSP extends BSP { + public static final Log LOG = LogFactory.getLog(HelloBSP.class); + private Configuration conf; + private final static int PRINT_INTERVAL = 1000; + private int num; + + @Override + public void bsp(BSPPeer bspPeer) throws IOException, KeeperException, + InterruptedException { + num = conf.getInt("bsp.peers.num", 0); + LOG.info(bspPeer.getAllPeerNames()); + int i = 0; + for (String otherPeer : bspPeer.getAllPeerNames()) { + String peerName = bspPeer.getPeerName(); + if (peerName.equals(otherPeer)) { + LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": " + peerName); + } + + Thread.sleep(PRINT_INTERVAL); + bspPeer.sync(); + i++; + } + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + } + + public static void main(String[] args) throws IOException, + InterruptedException, ClassNotFoundException { + HamaConfiguration conf = new HamaConfiguration(); + // TODO some keys that should be within a conf + conf.set("yarn.resourcemanager.address", "0.0.0.0:8040"); + conf.set("bsp.local.dir", "/tmp/bsp-yarn/"); + + YARNBSPJob job = new YARNBSPJob(conf); + job.setBspClass(HelloBSP.class); + job.setJarByClass(HelloBSP.class); + job.setJobName("Serialize Printing"); + job.setMemoryUsedPerTaskInMb(50); + job.setNumBspTask(2); + // TODO waitForCompletion(true) throws exceptions + job.waitForCompletion(false); + } +} Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java?rev=1183352&view=auto ============================================================================== --- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java (added) +++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java Fri Oct 14 13:29:10 2011 @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.sync; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * Custom writable for string arrays, because ArrayWritable has no default + * constructor and is broken. + * + */ +public class StringArrayWritable implements Writable { + + private String[] array; + + public StringArrayWritable() { + super(); + } + + public StringArrayWritable(String[] array) { + super(); + this.array = array; + } + + // no defensive copy needed because this always comes from an rpc call. + public String[] get() { + return array; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(array.length); + for (String s : array) { + out.writeUTF(s); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + array = new String[in.readInt()]; + for (int i = 0; i < array.length; i++) { + array[i] = in.readUTF(); + } + } + +} Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java?rev=1183352&view=auto ============================================================================== --- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java (added) +++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java Fri Oct 14 13:29:10 2011 @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.sync; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hama.bsp.TaskAttemptID; + +/** + * Hadoop RPC based barrier synchronization service. + * + */ +public interface SyncServer extends VersionedProtocol { + + public static final long versionID = 0L; + + public void enterBarrier(TaskAttemptID id); + + public void leaveBarrier(TaskAttemptID id); + + public void register(TaskAttemptID id, Text hostAddress, LongWritable port); + + public LongWritable getSuperStep(); + + public StringArrayWritable getAllPeerNames(); + + public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress, + LongWritable port); + + public void stopServer(); + +} Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java?rev=1183352&view=auto ============================================================================== --- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java (added) +++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java Fri Oct 14 13:29:10 2011 @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.sync; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hama.bsp.TaskAttemptID; + +/** + * Synchronization Deamon. <br\> + */ +public class SyncServerImpl implements SyncServer, Callable<Long> { + + private static final Log LOG = LogFactory.getLog(SyncServerImpl.class); + + private Configuration conf = new Configuration(); + private Server server; + + private int parties; + + private CyclicBarrier barrier; + private CyclicBarrier leaveBarrier; + private Set<Integer> partySet; + private Set<String> peerAddresses; + + private volatile long superstep = 0L; + + public SyncServerImpl(int parties, String host, int port) throws IOException { + this.parties = parties; + this.barrier = new CyclicBarrier(parties); + this.leaveBarrier = new CyclicBarrier(parties, new SuperStepIncrementor( + this)); + + this.partySet = Collections.synchronizedSet(new HashSet<Integer>(parties)); + // tree set so there is ascending order for consistent returns in + // getAllPeerNames() + this.peerAddresses = Collections.synchronizedSet(new TreeSet<String>()); + // allocate ten more rpc handler than parties for additional services to + // plug in or to deal with failure. + this.server = RPC.getServer(this, host, port, parties + 10, false, conf); + LOG.info("Sync Server is now up at: " + host + ":" + port + "!"); + } + + public void start() throws IOException { + server.start(); + } + + @Override + public void stopServer() { + server.stop(); + } + + public void join() throws InterruptedException { + server.join(); + } + + public static SyncServer getService(Configuration conf) + throws NumberFormatException, IOException { + String syncAddress = conf.get("hama.sync.server.address"); + if (syncAddress == null || syncAddress.isEmpty() + || !syncAddress.contains(":")) { + throw new IllegalArgumentException( + "Server sync address must contain a colon and must be non-empty and not-null! Property \"hama.sync.server.address\" was: " + + syncAddress); + } + String[] hostPort = syncAddress.split(":"); + return (SyncServer) RPC.waitForProxy(SyncServer.class, + SyncServer.versionID, + new InetSocketAddress(hostPort[0], Integer.valueOf(hostPort[1])), conf); + + } + + @Override + public void enterBarrier(TaskAttemptID id) { + LOG.info("Task: " + id.getId() + " entered Barrier!"); + if (partySet.contains(id.getId())) { + try { + barrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + } else { + LOG.warn("TaskID " + id + " is no verified task!"); + } + } + + @Override + public void leaveBarrier(TaskAttemptID id) { + LOG.info("Task: " + id.getId() + " leaves Barrier!"); + if (partySet.contains(id.getId())) { + try { + leaveBarrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + } else { + LOG.warn("TaskID " + id + " is no verified task!"); + } + } + + @Override + public synchronized void register(TaskAttemptID id, Text hostAddress, + LongWritable port) { + partySet.add(id.getId()); + String peer = hostAddress.toString() + ":" + port.get(); + peerAddresses.add(peer); + LOG.info("Registered: " + id.getId() + " for peer " + peer); + if (partySet.size() > parties) { + LOG.warn("Registered more tasks than configured!"); + } + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return clientVersion; + } + + private static class SuperStepIncrementor implements Runnable { + + private final SyncServerImpl instance; + + public SuperStepIncrementor(SyncServerImpl syncServer) { + this.instance = syncServer; + } + + @Override + public void run() { + synchronized (instance) { + this.instance.superstep += 1L; + LOG.info("Entering superstep: " + this.instance.superstep); + } + } + + } + + public static void main(String[] args) throws IOException, + InterruptedException { + LOG.info(Arrays.toString(args)); + if (args.length == 3) { + SyncServerImpl syncServer = new SyncServerImpl(Integer.valueOf(args[0]), + args[1], Integer.valueOf(args[2])); + syncServer.start(); + syncServer.join(); + } else { + throw new IllegalArgumentException( + "Argument count does not match 3! Given size was " + args.length + + " and parameters were " + Arrays.toString(args)); + } + } + + @Override + public Long call() throws Exception { + this.start(); + this.join(); + return this.superstep; + } + + @Override + public synchronized LongWritable getSuperStep() { + return new LongWritable(superstep); + } + + @Override + public synchronized StringArrayWritable getAllPeerNames() { + return new StringArrayWritable( + peerAddresses.toArray(new String[peerAddresses.size()])); + } + + @Override + public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress, + LongWritable port) { + // TODO Auto-generated method stub + // basically has to recreate the barriers and remove from the two basic + // sets. + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + // TODO Auto-generated method stub + return new ProtocolSignature(); + } + +} Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/hama/trunk/yarn/src/main/resources/log4j.properties URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/resources/log4j.properties?rev=1183352&view=auto ============================================================================== --- incubator/hama/trunk/yarn/src/main/resources/log4j.properties (added) +++ incubator/hama/trunk/yarn/src/main/resources/log4j.properties Fri Oct 14 13:29:10 2011 @@ -0,0 +1,243 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Define some default values that can be overridden by system properties +hama.root.logger=INFO,console +hama.log.dir=. +hama.log.file=hama.log + +# Define the root logger to the system property "hama.root.logger". +log4j.rootLogger=${hama.root.logger} + +# Logging Threshold +log4j.threshhold=ALL + +# +# Daily Rolling File Appender +# +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hama.log.dir}/${hama.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n + +# Debugging Pattern format +#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + +# +# TaskLog Appender +# + +#Default values +hama.tasklog.taskid=null +hama.tasklog.noKeepSplits=4 +hama.tasklog.totalLogFileSize=100 +hama.tasklog.purgeLogSplits=true +hama.tasklog.logsRetainHours=12 + +log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender +log4j.appender.TLA.taskId=${hama.tasklog.taskid} +log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize} + +log4j.appender.TLA.layout=org.apache.log4j.PatternLayout +log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n + +# +# console +# Add "console" to rootlogger above if you want to use this +# +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n + +# Custom Logging levels + +#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG +#log4j.logger.org.apache.hadoop.dfs=DEBUG +#log4j.logger.org.apache.hama=DEBUG +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Define some default values that can be overridden by system properties +hama.root.logger=INFO,console +hama.log.dir=. +hama.log.file=hama.log + +# Define the root logger to the system property "hama.root.logger". +log4j.rootLogger=${hama.root.logger} + +# Logging Threshold +log4j.threshhold=ALL + +# +# Daily Rolling File Appender +# +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hama.log.dir}/${hama.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n + +# Debugging Pattern format +#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + +# +# TaskLog Appender +# + +#Default values +hama.tasklog.taskid=null +hama.tasklog.noKeepSplits=4 +hama.tasklog.totalLogFileSize=100 +hama.tasklog.purgeLogSplits=true +hama.tasklog.logsRetainHours=12 + +log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender +log4j.appender.TLA.taskId=${hama.tasklog.taskid} +log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize} + +log4j.appender.TLA.layout=org.apache.log4j.PatternLayout +log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n + +# +# console +# Add "console" to rootlogger above if you want to use this +# +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n + +# Custom Logging levels + +#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG +#log4j.logger.org.apache.hadoop.dfs=DEBUG +#log4j.logger.org.apache.hama=DEBUG +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Define some default values that can be overridden by system properties +hama.root.logger=INFO,console +hama.log.dir=. +hama.log.file=hama.log + +# Define the root logger to the system property "hama.root.logger". +log4j.rootLogger=${hama.root.logger} + +# Logging Threshold +log4j.threshhold=ALL + +# +# Daily Rolling File Appender +# +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hama.log.dir}/${hama.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n + +# Debugging Pattern format +#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + +# +# TaskLog Appender +# + +#Default values +hama.tasklog.taskid=null +hama.tasklog.noKeepSplits=4 +hama.tasklog.totalLogFileSize=100 +hama.tasklog.purgeLogSplits=true +hama.tasklog.logsRetainHours=12 + +log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender +log4j.appender.TLA.taskId=${hama.tasklog.taskid} +log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize} + +log4j.appender.TLA.layout=org.apache.log4j.PatternLayout +log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n + +# +# console +# Add "console" to rootlogger above if you want to use this +# +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n + +# Custom Logging levels + +#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG +#log4j.logger.org.apache.hadoop.dfs=DEBUG +#log4j.logger.org.apache.hama=DEBUG Propchange: incubator/hama/trunk/yarn/src/main/resources/log4j.properties ------------------------------------------------------------------------------ svn:eol-style = native