Author: chl501 Date: Mon Apr 2 15:26:37 2012 New Revision: 1308383 URL: http://svn.apache.org/viewvc?rev=1308383&view=rev Log: [HAMA-542] Federator for collecting GroomServer information
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Federator.java incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/ incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java Removed: incubator/hama/trunk/core/src/test/org/ Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1308383&r1=1308382&r2=1308383&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Mon Apr 2 15:26:37 2012 @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -69,11 +70,10 @@ GroomServerManager, Watcher { public static final Log LOG = LogFactory.getLog(BSPMaster.class); public static final String localModeMessage = "Local mode detected, no launch of the daemon needed."; - public static final long GROOMSERVER_EXPIRY_INTERVAL = 10 * 60 * 1000; private static final int FS_ACCESS_RETRY_PERIOD = 10000; private HamaConfiguration conf; - private ZooKeeper zk = null; + ZooKeeper zk = null; private String bspRoot = null; /** @@ -86,7 +86,7 @@ GroomServerManager, Watcher { static long JOBINIT_SLEEP_INTERVAL = 2000; // States - State state = State.INITIALIZING; + final AtomicReference<State> state = new AtomicReference<State>(State.INITIALIZING); // Attributes String masterIdentifier; @@ -443,14 +443,12 @@ GroomServerManager, Watcher { public static BSPMaster startMaster(HamaConfiguration conf, String identifier) throws IOException, InterruptedException { - BSPMaster result = new BSPMaster(conf, identifier); + // init zk root and child nodes + result.initZK(conf); // need init zk before scheduler starts result.taskScheduler.setGroomServerManager(result); result.taskScheduler.start(); - // init zk root and child nodes - result.initZK(conf); - return result; } @@ -504,7 +502,6 @@ GroomServerManager, Watcher { } catch (Exception e) { LOG.warn("Could not clear zookeeper nodes.", e); - } } @@ -587,9 +584,7 @@ GroomServerManager, Watcher { this.masterServer.start(); - synchronized (this) { - state = State.RUNNING; - } + state.set(State.RUNNING); instructor = new Instructor(); instructor.bind(ReportGroomStatusDirective.class, @@ -671,10 +666,10 @@ GroomServerManager, Watcher { this.totalTaskCapacity = tasksPerGroom * numGroomServers; if (detailed) { - return new ClusterStatus(groomsMap, totalTasks, totalTaskCapacity, state); + return new ClusterStatus(groomsMap, totalTasks, totalTaskCapacity, state.get()); } else { return new ClusterStatus(numGroomServers, totalTasks, totalTaskCapacity, - state); + state.get()); } } @@ -865,7 +860,7 @@ GroomServerManager, Watcher { } public BSPMaster.State currentState() { - return this.state; + return this.state.get(); } @Override Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1308383&r1=1308382&r2=1308383&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Mon Apr 2 15:26:37 2012 @@ -383,9 +383,7 @@ public class GroomServer implements Runn } } - if (conf.getBoolean("bsp.monitor.enabled", true)) { - // TODO: conf.get("bsp.monitor.class.impl", "Monitor.class") - // so user can switch to customized monitor impl if necessary. + if (conf.getBoolean("bsp.monitor.enabled", false)) { new Monitor(conf, zk, this.groomServerName).start(); } Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1308383&r1=1308382&r2=1308383&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Mon Apr 2 15:26:37 2012 @@ -23,12 +23,29 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import static java.util.concurrent.TimeUnit.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.GroomServerStatus; import org.apache.hama.ipc.GroomProtocol; +import org.apache.hama.monitor.Federator; +import org.apache.hama.monitor.Federator.Act; +import org.apache.hama.monitor.Federator.CollectorHandler; +import org.apache.hama.monitor.Metric; +import org.apache.hama.monitor.MetricsRecord; +import org.apache.hama.monitor.Monitor; +import org.apache.hama.monitor.ZKCollector; +import org.apache.zookeeper.ZooKeeper; /** * A simple task scheduler. @@ -41,22 +58,27 @@ class SimpleTaskScheduler extends TaskSc public static final String PROCESSING_QUEUE = "processingQueue"; public static final String FINISHED_QUEUE = "finishedQueue"; - private QueueManager queueManager; - private volatile boolean initialized; - private JobListener jobListener; - private JobProcessor jobProcessor; + private final AtomicReference<QueueManager> queueManager = + new AtomicReference<QueueManager>(); + private AtomicBoolean initialized = new AtomicBoolean(false); + private final JobListener jobListener; + private final JobProcessor jobProcessor; + private final AtomicReference<Federator> federator = + new AtomicReference<Federator>(); + private final ConcurrentMap<String, MetricsRecord> repository = + new ConcurrentHashMap<String, MetricsRecord>(); + private final ScheduledExecutorService scheduler; private class JobListener extends JobInProgressListener { @Override public void jobAdded(JobInProgress job) throws IOException { - queueManager.initJob(job); // init task - queueManager.addJob(WAIT_QUEUE, job); + queueManager.get().initJob(job); // init task + queueManager.get().addJob(WAIT_QUEUE, job); } @Override public void jobRemoved(JobInProgress job) throws IOException { - // queueManager.removeJob(WAIT_QUEUE, job); - queueManager.moveJob(PROCESSING_QUEUE, FINISHED_QUEUE, job); + queueManager.get().moveJob(PROCESSING_QUEUE, FINISHED_QUEUE, job); } } @@ -70,19 +92,19 @@ class SimpleTaskScheduler extends TaskSc * JobInProgress from Wait Queue to Processing Queue. */ public void run() { - if (false == initialized) { + if (!initialized.get()) { throw new IllegalStateException("SimpleTaskScheduler initialization" + " is not yet finished!"); } - while (initialized) { - Queue<JobInProgress> queue = queueManager.findQueue(WAIT_QUEUE); + while (initialized.get()) { + Queue<JobInProgress> queue = queueManager.get().findQueue(WAIT_QUEUE); if (null == queue) { LOG.error(WAIT_QUEUE + " does not exist."); throw new NullPointerException(WAIT_QUEUE + " does not exist."); } // move a job from the wait queue to the processing queue JobInProgress j = queue.removeJob(); - queueManager.addJob(PROCESSING_QUEUE, j); + queueManager.get().addJob(PROCESSING_QUEUE, j); // schedule Collection<GroomServerStatus> glist = groomServerManager .groomServerStatusKeySet(); @@ -161,32 +183,89 @@ class SimpleTaskScheduler extends TaskSc } } + /** + * Periodically collect metrics info. + */ + private class JvmCollector implements Runnable { + final Federator federator; + final ZooKeeper zk; + JvmCollector(final Federator federator, final ZooKeeper zk) { + this.federator = federator; + this.zk = zk; + } + public void run() { + for(GroomServerStatus status: + groomServerManager.groomServerStatusKeySet()) { + final String groom = status.getGroomName(); + final String jvmPath = Monitor.MONITOR_ROOT_PATH+groom+"/metrics/jvm"; + final Act act = + new Act(new ZKCollector(zk, "jvm", "Jvm metrics.", jvmPath), + new CollectorHandler() { + public void handle(Future future) { + try { + MetricsRecord record = (MetricsRecord)future.get(); + if(null != record) { + if(LOG.isDebugEnabled()) { + for(Metric metric: record.metrics()) { + LOG.debug("Metric name:"+metric.name()+" metric value:"+metric.value()); + } + } + repository.put(groom, record); + } + } catch (InterruptedException ie) { + LOG.warn(ie); + Thread.currentThread().interrupt(); + } catch (ExecutionException ee) { + LOG.warn(ee.getCause()); + } + } + }); + this.federator.register(act); + } + } + } + public SimpleTaskScheduler() { this.jobListener = new JobListener(); this.jobProcessor = new JobProcessor(); + this.scheduler = Executors.newSingleThreadScheduledExecutor(); } @Override public void start() { - this.queueManager = new QueueManager(getConf()); // TODO: need factory? - this.queueManager.createFCFSQueue(WAIT_QUEUE); - this.queueManager.createFCFSQueue(PROCESSING_QUEUE); - this.queueManager.createFCFSQueue(FINISHED_QUEUE); + if(initialized.get()) + throw new IllegalStateException(SimpleTaskScheduler.class.getSimpleName()+ + " is started."); + this.queueManager.set(new QueueManager(getConf())); + this.federator.set(new Federator((HamaConfiguration)getConf())); + this.queueManager.get().createFCFSQueue(WAIT_QUEUE); + this.queueManager.get().createFCFSQueue(PROCESSING_QUEUE); + this.queueManager.get().createFCFSQueue(FINISHED_QUEUE); groomServerManager.addJobInProgressListener(this.jobListener); - this.initialized = true; + this.initialized.set(true); + if(null != getConf() && + getConf().getBoolean("bsp.federator.enabled", false)) { + this.federator.get().start(); + } this.jobProcessor.start(); + if(null != getConf() && + getConf().getBoolean("bsp.federator.enabled", false)) { + this.scheduler.scheduleAtFixedRate(new JvmCollector(federator.get(), + ((BSPMaster)groomServerManager).zk), 5, 5, SECONDS); + } } @Override public void terminate() { - this.initialized = false; + this.initialized.set(false); if (null != this.jobListener) groomServerManager.removeJobInProgressListener(this.jobListener); + this.jobProcessor.interrupt(); + this.federator.get().interrupt(); } @Override public Collection<JobInProgress> getJobs(String queue) { - return (queueManager.findQueue(queue)).jobs(); - // return jobQueue; + return (queueManager.get().findQueue(queue)).jobs(); } } Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java?rev=1308383&r1=1308382&r2=1308383&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java Mon Apr 2 15:26:37 2012 @@ -47,8 +47,11 @@ public final class Configurator { new ConcurrentHashMap<String, Long>(); /** + * Configure plugins directory for monitoring GroomServer. * @param conf file points out the plugin dir location. - * @return Map contains jar path and task to be executed. + * @return Map contains jar path and task to be executed; null if + * plugin directory, default set to $HAMA_HOME/plugins, doesn't + * exist. */ public static Map<String, Task> configure(HamaConfiguration conf, MonitorListener listener) throws IOException { @@ -56,6 +59,7 @@ public final class Configurator { String pluginPath = conf.get("bsp.monitor.plugins.dir", hamaHome+File.separator+DEFAULT_PLUGINS_DIR); File pluginDir = new File(pluginPath); + if(null == pluginDir || null == pluginDir.listFiles()) return null; ClassLoader loader = Thread.currentThread().getContextClassLoader(); Map<String, Task> taskList = new HashMap<String, Task>(); LOG.debug("Scanning jar files within "+pluginDir+"."); Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Federator.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Federator.java?rev=1308383&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Federator.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Federator.java Mon Apr 2 15:26:37 2012 @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.monitor; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hama.HamaConfiguration; + +public final class Federator extends Thread { + + public static final Log LOG = LogFactory.getLog(Federator.class); + + private final HamaConfiguration configuration; + + private final ExecutorService workers; + + private final BlockingQueue<Act> commands = new LinkedBlockingQueue<Act>(); + + private static class ServiceWorker implements Callable { + + final Collector collector; + + public ServiceWorker(final Collector collector) { + this.collector = collector; + } + + @Override + public Object call() throws Exception { + return this.collector.harvest(); + } + + } + + /** + * A token binds collector and handler together. + */ + public static final class Act { + final Collector collector; + final CollectorHandler handler; + + public Act(final Collector collector, final CollectorHandler handler) { + this.collector = collector; + this.handler = handler; + } + + public final Collector collector() { + return this.collector; + } + + public final CollectorHandler handler() { + return this.handler; + } + + } + + /** + * Purpose to handle the result returned by Collector. + */ + public static interface CollectorHandler { + + /** + * Handle the result. + */ + void handle(Future future); + + } + + /** + * Collect GroomServer information from repository. + */ + public static interface Collector { + + /** + * Function is called to collect GroomServer information from specific + * place. + */ + Object harvest() throws Exception; + + } + + public Federator(final HamaConfiguration configuration) { + this.configuration = configuration; + this.workers = Executors.newCachedThreadPool(); + setName(Federator.class.getSimpleName()); + setDaemon(true); + } + + public final void register(final Act act) { + try { + if(null == act || null == act.collector() || null == act.handler()) + throw new NullPointerException("Collector or CollectorHandler "+ + " is not provided."); + commands.put(act); + } catch (InterruptedException ie) { + LOG.error(ie); + Thread.currentThread().interrupt(); + } + } + + public void run() { + try { + while(!Thread.currentThread().interrupted()) { + Act act = commands.take(); + act.handler().handle(workers.submit(new ServiceWorker(act.collector()))); + } + } catch (InterruptedException ie) { + LOG.error(ie); + Thread.currentThread().interrupt(); + } + } + +} Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java?rev=1308383&r1=1308382&r2=1308383&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java Mon Apr 2 15:26:37 2012 @@ -17,7 +17,8 @@ */ package org.apache.hama.monitor; -import java.io.File; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.util.List; import java.util.Map; @@ -50,6 +51,7 @@ import org.apache.zookeeper.ZooDefs.Ids; public final class Monitor extends Thread implements MonitorListener { public static final Log LOG = LogFactory.getLog(Monitor.class); + public static final String MONITOR_ROOT_PATH = "/monitor/"; private final Map<String, TaskWorker> workers = // <jar path, task worker> new ConcurrentHashMap<String, TaskWorker>(); @@ -192,25 +194,32 @@ public final class Monitor extends Threa public void handle(Result result) { Object obj = result.get(); if(obj instanceof MetricsRecord) { - String znode = "/monitor/"+this.groomServerName+"/metrics/"+result.name(); + String znode = + MONITOR_ROOT_PATH+this.groomServerName+"/metrics/"+result.name(); ZKUtil.create(zk, znode); // recursively create znode path MetricsRecord record = (MetricsRecord) obj; + int cnt = 0; for(Metric<? extends Number> metric: record.metrics()) { + cnt++; String name = metric.name(); Number value = metric.value(); try { // znode must exists so that child (znode/name) can be created. if(null != this.zk.exists(znode, false)) { - if(LOG.isDebugEnabled()) - LOG.debug("Name & value are going to be publish to zk -> ["+name+"] ["+value+"]"); - if(null == zk.exists(znode+File.separator+name, false)) { - String p = this.zk.create(znode+File.separator+name, toBytes(value), + String suffix = suffix(value);; + if(LOG.isDebugEnabled()) { + LOG.debug("Publish name ["+name+"] and value ["+value+ + "] to zk."); + } + final String zpath = znode+ZKUtil.ZK_SEPARATOR+name+suffix; + if(null == zk.exists(zpath, false)) { + String p = this.zk.create(zpath, toBytes(value), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - LOG.debug("Successfully publish data to zk with path to `"+p+"'"); + LOG.debug("Publish data to zk with path to `"+p+"'"); } else { // can we just update by increasing 1 version? - this.zk.setData(znode+File.separator+name, toBytes(value), -1); - LOG.debug("Successfully update data in znode: "+znode); + this.zk.setData(zpath, toBytes(value), -1); + LOG.debug("Update data in znode: "+znode); } } } catch (KeeperException ke) { @@ -225,21 +234,56 @@ public final class Monitor extends Threa } } - byte[] toBytes(Number value) { - if(value instanceof Double) { - return Bytes.toBytes(value.longValue()); + String suffix(Number value) { + if(value instanceof Byte) { + return "_b"; + } else if(value instanceof Double) { + return "_d"; } else if(value instanceof Float) { - return Bytes.toBytes(value.floatValue()); + return "_f"; } else if(value instanceof Integer) { - return Bytes.toBytes(value.intValue()); + return "_i"; } else if(value instanceof Long) { - return Bytes.toBytes(value.longValue()); - } else if(value instanceof Short) { - return Bytes.toBytes(value.shortValue()); + return "_l"; } else { - LOG.warn("Unknown type for value:"+value); - return null; + return "_?"; + } + } + + byte[] toBytes(Number value) { + if(value instanceof Byte) { + return new byte[] { value.byteValue() }; } + + byte[] bytes = null; + ByteArrayOutputStream dout = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(dout); + try{ + if(value instanceof Double) { + output.writeDouble(value.doubleValue()); + } else if(value instanceof Float) { + output.writeFloat(value.floatValue()); + } else if(value instanceof Integer) { + output.writeInt(value.intValue()); + } else if(value instanceof Short) { + output.writeShort(value.shortValue()); + } else if(value instanceof Long) { + output.writeLong(value.longValue()); + } else { + LOG.warn("Unkown data type: "+value); + } + bytes = dout.toByteArray(); + if(LOG.isDebugEnabled()) { + LOG.debug("bytes's length after value ("+value+") is converted: "+ + ((null!=bytes)?bytes.length:0)); + } + } catch(IOException ioe){ + LOG.warn("Fail writing data to output stream.", ioe); + } finally { + try { output.close(); } catch(IOException ioe) { + LOG.warn("Fail closing output stream.", ioe); } + } + return bytes; } } @@ -349,14 +393,16 @@ public final class Monitor extends Threa while(!Thread.currentThread().interrupted()) { Map<String, Task> tasks = Configurator.configure((HamaConfiguration)this.conf, listener); - for(Map.Entry<String, Task> entry: tasks.entrySet()) { - String jarPath = entry.getKey(); - Task t = entry.getValue(); - TaskWorker old = (TaskWorker) - ((ConcurrentMap)this.workers).putIfAbsent(jarPath, new TaskWorker(t)); - if(null != old) { - ((ConcurrentMap)this.workers).replace(jarPath, - new TaskWorker(t)); + if(null != tasks) { + for(Map.Entry<String, Task> entry: tasks.entrySet()) { + String jarPath = entry.getKey(); + Task t = entry.getValue(); + TaskWorker old = (TaskWorker) + ((ConcurrentMap)this.workers).putIfAbsent(jarPath, new TaskWorker(t)); + if(null != old) { + ((ConcurrentMap)this.workers).replace(jarPath, + new TaskWorker(t)); + } } } LOG.debug("Task worker list's size: "+workers.size()); Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java?rev=1308383&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java Mon Apr 2 15:26:37 2012 @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.monitor; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hama.monitor.Metric; +import org.apache.hama.monitor.MetricsRecord; +import org.apache.hama.monitor.Federator.Collector; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; + +/** + * Fire a action to harvest metrics from ZooKeeper. + */ +public final class ZKCollector implements Collector { + + public static final Log LOG = LogFactory.getLog(ZKCollector.class); + + private AtomicReference<Reference> reference = + new AtomicReference<Reference>(); + + static final class Reference { + final ZooKeeper zk; + final MetricsRecord record; // name for this search + final String path; // /path/to/metrics/parent/folder + public Reference(ZooKeeper zk, MetricsRecord record, String path) { + this.zk = zk; + this.record = record; + this.path = path; + } + } + + /** + * ZKCollector havests metrics information from ZooKeeper. + * @param zk is the target repository storing metrics. + * @param nameOfRecord is the name of MetricsRecord. + * @param descOfRecord is the description of MetricsRecord. + * @param path points to the <b>parent</b> directory of stored metrics. + */ + public ZKCollector(ZooKeeper zk, String nameOfRecord, String descOfRecord, + String path) { + Reference ref = this.reference.get(); + if(null == ref) { + this.reference.set(new Reference(zk, new MetricsRecord(nameOfRecord, + descOfRecord), path)); + } + } + + @Override + public Object harvest() throws Exception { + final String path = this.reference.get().path; + final ZooKeeper zk = this.reference.get().zk; + LOG.debug("Searching "+path+" in zookeeper."); + Stat stat = zk.exists(path, false); + if(null == stat) return null; // no need to collect data. + List<String> children = zk.getChildren(path, false); + if(LOG.isDebugEnabled()) { + LOG.debug("Leaves size is "+children.size()+" total znodes in list: "+ + children); + } + + // TODO: metrics record contains multiple metrics (1 to many) + // data is stored under zk e.g. /path/to/metrics/jvm/... + // within jvm folder metrics is stored in a form of name, value pair + final MetricsRecord record = reference.get().record; + if(null != children) { + for(String child: children) { + LOG.info("metrics -> "+child); + // <metricsName_d> indicates data type is double + String dataType = suffix(child); + byte[] dataInBytes = zk.getData(path+"/"+child, false, stat); + if(LOG.isDebugEnabled()) { + LOG.debug("Data length (in byte): "+dataInBytes.length); + } + String name = removeSuffix(child); + DataInputStream input = + new DataInputStream(new ByteArrayInputStream(dataInBytes)); + if("d".equals(dataType)) { + double dv = input.readDouble(); + LOG.info("metrics "+name+" value:"+dv); + record.add(new Metric(name, dv)); + } else if("f".equals(dataType)) { + float fv = input.readFloat(); + LOG.info("metrics "+name+" value:"+fv); + record.add(new Metric(name, fv)); + } else if("i".equals(dataType)) { + int iv = input.readInt(); + LOG.info("metrics "+name+" value:"+iv); + record.add(new Metric(name, iv)); + } else if("l".equals(dataType)) { + long lv = input.readLong(); + LOG.info("metrics "+name+" value:"+lv); + record.add(new Metric(name, lv)); + } else if("b".equals(dataType)) { + LOG.info("metrics"+name+" value:"+dataInBytes); + record.add(new Metric(name, dataInBytes)); + } else { + LOG.warn("Unkown data type for metrics name: "+child); + } + try {} finally{ input.close();} + } + } + return record; + } + + private String removeSuffix(String path) { + return path.substring(0, path.length()-2); + } + + private String suffix(String path) { + if(!"_".equals(path.substring(path.length()-2, path.length()-1))) { + return "?"; + } + return path.substring(path.length()-1, path.length()); + } + +} Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1308383&r1=1308382&r2=1308383&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Mon Apr 2 15:26:37 2012 @@ -138,7 +138,6 @@ public class TestBSPMasterGroomServer ex /* * BEGIN: ZooKeeper tests. - */ public void testClearZKNodes() throws IOException, KeeperException, InterruptedException { @@ -194,6 +193,7 @@ public class TestBSPMasterGroomServer ex System.out.println("Node has been removed correctly!"); } } + */ /* * END: ZooKeeper tests. Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1308383&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java Mon Apr 2 15:26:37 2012 @@ -0,0 +1,129 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hama.Constants; +import org.apache.hama.HamaCluster; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.examples.ClassSerializePrinting; +import org.apache.hama.zookeeper.QuorumPeer; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; + +public class TestZooKeeper extends HamaCluster { + + private static Log LOG = LogFactory.getLog(TestZooKeeper.class); + + private HamaConfiguration configuration; + + public TestZooKeeper() { + configuration = new HamaConfiguration(); + configuration.set("bsp.master.address", "localhost"); + assertEquals("Make sure master addr is set to localhost:", "localhost", + configuration.get("bsp.master.address")); + configuration.set("bsp.local.dir", "/tmp/hama-test"); + configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost"); + configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810); + configuration.set("hama.sync.client.class", + org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class + .getCanonicalName()); + } + + public void setUp() throws Exception { + super.setUp(); + } + + public void tearDown() throws Exception { + super.tearDown(); + } + + public void testClearZKNodes() throws IOException, KeeperException, + InterruptedException { + + // Clear any existing znode with the same path as bspRoot. + bspCluster.getBSPMaster().clearZKNodes(); + + int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, + 6000); + String connectStr = QuorumPeer.getZKQuorumServersString(configuration); + String bspRoot = configuration.get(Constants.ZOOKEEPER_ROOT, + Constants.DEFAULT_ZOOKEEPER_ROOT); + + // Establishing a zk session. + ZooKeeper zk = new ZooKeeper(connectStr, timeout, new Watcher() { + @Override + public void process(WatchedEvent event) { + // Do nothing.(Dummy Watcher) + } + }); + + // Creating dummy bspRoot if it doesn't already exist. + Stat s = zk.exists(bspRoot, false); + if (s == null) { + zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + + // Creating dummy child nodes at depth 1. + String node1 = bspRoot + "/task1"; + String node2 = bspRoot + "/task2"; + zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // Creating dummy child node at depth 2. + String node11 = node1 + "/superstep1"; + zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + ArrayList<String> list = (ArrayList<String>) zk.getChildren(bspRoot, false); + assertEquals(2, list.size()); + System.out.println(list.size()); + + bspCluster.getBSPMaster().clearZKNodes(); + + list = (ArrayList<String>) zk.getChildren(bspRoot, false); + System.out.println(list.size()); + assertEquals(0, list.size()); + + try { + zk.getData(node11, false, null); + fail(); + } catch (KeeperException.NoNodeException e) { + System.out.println("Node has been removed correctly!"); + } + } + +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java?rev=1308383&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java Mon Apr 2 15:26:37 2012 @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.monitor; + +import java.util.Map; + +import junit.framework.TestCase; + +import static org.junit.Assert.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hama.HamaConfiguration; +import org.apache.hama.monitor.Monitor.Task; + +public class TestConfigurator extends TestCase { + + public static final Log LOG = LogFactory.getLog(TestConfigurator.class); + + /** + * If test fails, please check if `plugins' dir exists under "user.dir". + */ + public void testPluginDirNotPresented() throws Exception { + System.setProperty("hama.home.dir", System.getProperty("user.dir")); + Map<String, Task> tasks = Configurator.configure(new HamaConfiguration(), null); + LOG.info("Plugins dir is not created, returned tasks should be null -> "+tasks); + assertNull("Tasks returned should be null because no plugins dir is created.", tasks); + } + + +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java?rev=1308383&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java Mon Apr 2 15:26:37 2012 @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.monitor; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import junit.framework.TestCase; + +import static org.junit.Assert.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hama.HamaConfiguration; +import org.apache.hama.monitor.Federator.Act; +import org.apache.hama.monitor.Federator.Collector; +import org.apache.hama.monitor.Federator.CollectorHandler; + +public class TestFederator extends TestCase { + + public static final Log LOG = LogFactory.getLog(TestFederator.class); + + Federator federator; + + final static int expected = 10; + + public static final class DummyCollector implements Collector { + + final AtomicInteger sum = new AtomicInteger(0); + + public DummyCollector(int value) { + sum.set(value); + } + + public Object harvest() throws Exception { + assertEquals("Test if value is equal before harvest.", expected, sum.get()); + int result = sum.incrementAndGet(); + Thread.sleep(2*1000); // simulate task execution which takes time. + assertEquals("Test if value is equal after harvest.", (expected+1), result); + return result; + } + + } + + public void setUp() throws Exception { + this.federator = new Federator(new HamaConfiguration()); + this.federator.start(); + } + + public void testExecutionFlow() throws Exception { + LOG.info("Value before submitted: "+expected); + final AtomicInteger finalResult = new AtomicInteger(0); + final Act act = new Act(new DummyCollector(expected), new CollectorHandler() { + public void handle(Future future) { + try { + finalResult.set(((Integer)future.get()).intValue()); + LOG.info("Value after submitted: "+finalResult); + } catch (ExecutionException ee) { + LOG.error(ee); + } catch (InterruptedException ie) { + LOG.error(ie); + Thread.currentThread().interrupt(); + } + } + }); + this.federator.register(act); + Thread.sleep(3*1000); + assertEquals("Result should be "+(expected+1)+".", finalResult.get(), (expected+1)); + } + + public void tearDown() throws Exception { + this.federator.interrupt(); + } + +}