Author: chl501
Date: Mon Apr  2 15:26:37 2012
New Revision: 1308383

URL: http://svn.apache.org/viewvc?rev=1308383&view=rev
Log:
[HAMA-542] Federator for collecting GroomServer information

Added:
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Federator.java
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java
    
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/
    
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java
    
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java
Removed:
    incubator/hama/trunk/core/src/test/org/
Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java
    
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1308383&r1=1308382&r2=1308383&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java 
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java 
Mon Apr  2 15:26:37 2012
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -69,11 +70,10 @@ GroomServerManager, Watcher {
 
   public static final Log LOG = LogFactory.getLog(BSPMaster.class);
   public static final String localModeMessage = "Local mode detected, no 
launch of the daemon needed.";
-  public static final long GROOMSERVER_EXPIRY_INTERVAL = 10 * 60 * 1000;
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
 
   private HamaConfiguration conf;
-  private ZooKeeper zk = null;
+  ZooKeeper zk = null;
   private String bspRoot = null;
 
   /**
@@ -86,7 +86,7 @@ GroomServerManager, Watcher {
   static long JOBINIT_SLEEP_INTERVAL = 2000;
 
   // States
-  State state = State.INITIALIZING;
+  final AtomicReference<State> state = new 
AtomicReference<State>(State.INITIALIZING);
 
   // Attributes
   String masterIdentifier;
@@ -443,14 +443,12 @@ GroomServerManager, Watcher {
 
   public static BSPMaster startMaster(HamaConfiguration conf, String 
identifier)
       throws IOException, InterruptedException {
-
     BSPMaster result = new BSPMaster(conf, identifier);
+    // init zk root and child nodes
+    result.initZK(conf); // need init zk before scheduler starts
     result.taskScheduler.setGroomServerManager(result);
     result.taskScheduler.start();
 
-    // init zk root and child nodes
-    result.initZK(conf);
-
     return result;
   }
 
@@ -504,7 +502,6 @@ GroomServerManager, Watcher {
 
     } catch (Exception e) {
       LOG.warn("Could not clear zookeeper nodes.", e);
-
     }      
   }
 
@@ -587,9 +584,7 @@ GroomServerManager, Watcher {
 
     this.masterServer.start();
 
-    synchronized (this) {
-      state = State.RUNNING;
-    }
+    state.set(State.RUNNING);
 
     instructor = new Instructor();
     instructor.bind(ReportGroomStatusDirective.class,
@@ -671,10 +666,10 @@ GroomServerManager, Watcher {
     this.totalTaskCapacity = tasksPerGroom * numGroomServers;
 
     if (detailed) {
-      return new ClusterStatus(groomsMap, totalTasks, totalTaskCapacity, 
state);
+      return new ClusterStatus(groomsMap, totalTasks, totalTaskCapacity, 
state.get());
     } else {
       return new ClusterStatus(numGroomServers, totalTasks, totalTaskCapacity,
-          state);
+          state.get());
     }
   }
 
@@ -865,7 +860,7 @@ GroomServerManager, Watcher {
   }
 
   public BSPMaster.State currentState() {
-    return this.state;
+    return this.state.get();
   }
 
   @Override

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1308383&r1=1308382&r2=1308383&view=diff
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java 
(original)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java 
Mon Apr  2 15:26:37 2012
@@ -383,9 +383,7 @@ public class GroomServer implements Runn
       }
     }
 
-    if (conf.getBoolean("bsp.monitor.enabled", true)) {
-      // TODO: conf.get("bsp.monitor.class.impl", "Monitor.class")
-      // so user can switch to customized monitor impl if necessary.
+    if (conf.getBoolean("bsp.monitor.enabled", false)) {
       new Monitor(conf, zk, this.groomServerName).start();
     }
 

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1308383&r1=1308382&r2=1308383&view=diff
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
 (original)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
 Mon Apr  2 15:26:37 2012
@@ -23,12 +23,29 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
+import static java.util.concurrent.TimeUnit.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.GroomServerStatus;
 import org.apache.hama.ipc.GroomProtocol;
+import org.apache.hama.monitor.Federator;
+import org.apache.hama.monitor.Federator.Act;
+import org.apache.hama.monitor.Federator.CollectorHandler;
+import org.apache.hama.monitor.Metric;
+import org.apache.hama.monitor.MetricsRecord;
+import org.apache.hama.monitor.Monitor;
+import org.apache.hama.monitor.ZKCollector;
+import org.apache.zookeeper.ZooKeeper;
 
 /**
  * A simple task scheduler. 
@@ -41,22 +58,27 @@ class SimpleTaskScheduler extends TaskSc
   public static final String PROCESSING_QUEUE = "processingQueue";
   public static final String FINISHED_QUEUE = "finishedQueue";
 
-  private QueueManager queueManager;
-  private volatile boolean initialized;
-  private JobListener jobListener;
-  private JobProcessor jobProcessor;
+  private final AtomicReference<QueueManager> queueManager = 
+    new AtomicReference<QueueManager>();
+  private AtomicBoolean initialized = new AtomicBoolean(false);
+  private final JobListener jobListener;
+  private final JobProcessor jobProcessor;
+  private final AtomicReference<Federator> federator = 
+    new AtomicReference<Federator>(); 
+  private final ConcurrentMap<String, MetricsRecord> repository = 
+    new ConcurrentHashMap<String, MetricsRecord>();
+  private final ScheduledExecutorService scheduler;
 
   private class JobListener extends JobInProgressListener {
     @Override
     public void jobAdded(JobInProgress job) throws IOException {
-      queueManager.initJob(job); // init task
-      queueManager.addJob(WAIT_QUEUE, job);
+      queueManager.get().initJob(job); // init task
+      queueManager.get().addJob(WAIT_QUEUE, job);
     }
 
     @Override
     public void jobRemoved(JobInProgress job) throws IOException {
-      // queueManager.removeJob(WAIT_QUEUE, job);
-      queueManager.moveJob(PROCESSING_QUEUE, FINISHED_QUEUE, job);
+      queueManager.get().moveJob(PROCESSING_QUEUE, FINISHED_QUEUE, job);
     }
   }
 
@@ -70,19 +92,19 @@ class SimpleTaskScheduler extends TaskSc
      * JobInProgress from Wait Queue to Processing Queue.
      */
     public void run() {
-      if (false == initialized) {
+      if (!initialized.get()) {
         throw new IllegalStateException("SimpleTaskScheduler initialization"
             + " is not yet finished!");
       }
-      while (initialized) {
-        Queue<JobInProgress> queue = queueManager.findQueue(WAIT_QUEUE);
+      while (initialized.get()) {
+        Queue<JobInProgress> queue = queueManager.get().findQueue(WAIT_QUEUE);
         if (null == queue) {
           LOG.error(WAIT_QUEUE + " does not exist.");
           throw new NullPointerException(WAIT_QUEUE + " does not exist.");
         }
         // move a job from the wait queue to the processing queue
         JobInProgress j = queue.removeJob();
-        queueManager.addJob(PROCESSING_QUEUE, j);
+        queueManager.get().addJob(PROCESSING_QUEUE, j);
         // schedule
         Collection<GroomServerStatus> glist = groomServerManager
             .groomServerStatusKeySet();
@@ -161,32 +183,89 @@ class SimpleTaskScheduler extends TaskSc
     }
   }
 
+  /**
+   * Periodically collect metrics info.
+   */
+  private class JvmCollector implements Runnable {
+    final Federator federator;
+    final ZooKeeper zk; 
+    JvmCollector(final Federator federator, final ZooKeeper zk) {
+      this.federator = federator;
+      this.zk = zk;
+    } 
+    public void run() {
+      for(GroomServerStatus status: 
+          groomServerManager.groomServerStatusKeySet()) {
+        final String groom = status.getGroomName();
+        final String jvmPath = Monitor.MONITOR_ROOT_PATH+groom+"/metrics/jvm";
+        final Act act = 
+          new Act(new ZKCollector(zk, "jvm", "Jvm metrics.", jvmPath), 
+                  new CollectorHandler() {
+            public void handle(Future future) {
+              try {
+                MetricsRecord record = (MetricsRecord)future.get();
+                if(null != record) {
+                  if(LOG.isDebugEnabled()) {
+                    for(Metric metric: record.metrics()) {
+                      LOG.debug("Metric name:"+metric.name()+" metric 
value:"+metric.value());
+                    }
+                  }
+                  repository.put(groom, record);
+                }
+              } catch (InterruptedException ie) {
+                LOG.warn(ie);
+                Thread.currentThread().interrupt();
+              } catch (ExecutionException ee) {
+                LOG.warn(ee.getCause());
+              }
+            }        
+          }); 
+        this.federator.register(act);
+      } 
+    }
+  }
+
   public SimpleTaskScheduler() {
     this.jobListener = new JobListener();
     this.jobProcessor = new JobProcessor();
+    this.scheduler = Executors.newSingleThreadScheduledExecutor();
   }
 
   @Override
   public void start() {
-    this.queueManager = new QueueManager(getConf()); // TODO: need factory?
-    this.queueManager.createFCFSQueue(WAIT_QUEUE);
-    this.queueManager.createFCFSQueue(PROCESSING_QUEUE);
-    this.queueManager.createFCFSQueue(FINISHED_QUEUE);
+    if(initialized.get()) 
+      throw new 
IllegalStateException(SimpleTaskScheduler.class.getSimpleName()+
+      " is started.");
+    this.queueManager.set(new QueueManager(getConf())); 
+    this.federator.set(new Federator((HamaConfiguration)getConf()));
+    this.queueManager.get().createFCFSQueue(WAIT_QUEUE);
+    this.queueManager.get().createFCFSQueue(PROCESSING_QUEUE);
+    this.queueManager.get().createFCFSQueue(FINISHED_QUEUE);
     groomServerManager.addJobInProgressListener(this.jobListener);
-    this.initialized = true;
+    this.initialized.set(true);
+    if(null != getConf() && 
+       getConf().getBoolean("bsp.federator.enabled", false)) {
+      this.federator.get().start();
+    }
     this.jobProcessor.start();
+    if(null != getConf() && 
+       getConf().getBoolean("bsp.federator.enabled", false)) {
+      this.scheduler.scheduleAtFixedRate(new JvmCollector(federator.get(), 
+      ((BSPMaster)groomServerManager).zk), 5, 5, SECONDS);
+    }
   }
 
   @Override
   public void terminate() {
-    this.initialized = false;
+    this.initialized.set(false);
     if (null != this.jobListener)
       groomServerManager.removeJobInProgressListener(this.jobListener);
+    this.jobProcessor.interrupt();
+    this.federator.get().interrupt();
   }
 
   @Override
   public Collection<JobInProgress> getJobs(String queue) {
-    return (queueManager.findQueue(queue)).jobs();
-    // return jobQueue;
+    return (queueManager.get().findQueue(queue)).jobs();
   }
 }

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java?rev=1308383&r1=1308382&r2=1308383&view=diff
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
 (original)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
 Mon Apr  2 15:26:37 2012
@@ -47,8 +47,11 @@ public final class Configurator {
     new ConcurrentHashMap<String, Long>();
 
   /**
+   * Configure plugins directory for monitoring GroomServer.
    * @param conf file points out the plugin dir location.
-   * @return Map contains jar path and task to be executed.
+   * @return Map contains jar path and task to be executed; null if 
+   *             plugin directory, default set to $HAMA_HOME/plugins, doesn't 
+   *             exist. 
    */ 
   public static Map<String, Task> configure(HamaConfiguration conf, 
       MonitorListener listener) throws IOException {
@@ -56,6 +59,7 @@ public final class Configurator {
     String pluginPath = conf.get("bsp.monitor.plugins.dir", 
       hamaHome+File.separator+DEFAULT_PLUGINS_DIR);
     File pluginDir = new File(pluginPath);
+    if(null == pluginDir || null == pluginDir.listFiles()) return null; 
     ClassLoader loader = Thread.currentThread().getContextClassLoader();
     Map<String, Task> taskList = new HashMap<String, Task>();
     LOG.debug("Scanning jar files within "+pluginDir+".");

Added: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Federator.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Federator.java?rev=1308383&view=auto
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Federator.java 
(added)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Federator.java 
Mon Apr  2 15:26:37 2012
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.HamaConfiguration;
+
+public final class Federator extends Thread {
+
+  public static final Log LOG = LogFactory.getLog(Federator.class);
+ 
+  private final HamaConfiguration configuration;
+
+  private final ExecutorService workers;
+
+  private final BlockingQueue<Act> commands = new LinkedBlockingQueue<Act>();
+
+  private static class ServiceWorker implements Callable {
+
+    final Collector collector;
+
+    public ServiceWorker(final Collector collector) {
+      this.collector = collector;
+    }
+
+    @Override
+    public Object call() throws Exception {
+      return this.collector.harvest();
+    }
+
+  }
+ 
+  /**
+   * A token binds collector and handler together.
+   */
+  public static final class Act {
+    final Collector collector;
+    final CollectorHandler handler;
+
+    public Act(final Collector collector, final CollectorHandler handler) {
+      this.collector = collector;
+      this.handler = handler;
+    }
+
+    public final Collector collector() {
+      return this.collector;
+    }
+
+    public final CollectorHandler handler() {
+      return this.handler;
+    }
+
+  }
+
+  /**
+   * Purpose to handle the result returned by Collector.
+   */
+  public static interface CollectorHandler {
+
+    /**
+     * Handle the result.
+     */
+    void handle(Future future); 
+
+  }
+  
+  /**
+   * Collect GroomServer information from repository. 
+   */
+  public static interface Collector {
+
+    /**
+     * Function is called to collect GroomServer information from specific
+     * place.
+     */
+    Object harvest() throws Exception;
+
+  }
+
+  public Federator(final HamaConfiguration configuration) {
+    this.configuration = configuration;
+    this.workers = Executors.newCachedThreadPool();
+    setName(Federator.class.getSimpleName());
+    setDaemon(true);
+  }
+
+  public final void register(final Act act) {
+    try {
+      if(null == act || null == act.collector() || null == act.handler())
+        throw new NullPointerException("Collector or CollectorHandler "+
+        " is not provided."); 
+      commands.put(act);
+    } catch (InterruptedException ie) {
+      LOG.error(ie);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public void run() {
+    try {
+      while(!Thread.currentThread().interrupted()) {
+        Act act = commands.take();
+        act.handler().handle(workers.submit(new 
ServiceWorker(act.collector())));
+      }
+    } catch (InterruptedException ie) {
+      LOG.error(ie);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+}

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java?rev=1308383&r1=1308382&r2=1308383&view=diff
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java 
(original)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java 
Mon Apr  2 15:26:37 2012
@@ -17,7 +17,8 @@
  */
 package org.apache.hama.monitor;
 
-import java.io.File;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +51,7 @@ import org.apache.zookeeper.ZooDefs.Ids;
 public final class Monitor extends Thread implements MonitorListener { 
 
   public static final Log LOG = LogFactory.getLog(Monitor.class);
+  public static final String MONITOR_ROOT_PATH = "/monitor/";
 
   private final Map<String, TaskWorker> workers =  // <jar path, task worker>
     new ConcurrentHashMap<String, TaskWorker>();
@@ -192,25 +194,32 @@ public final class Monitor extends Threa
     public void handle(Result result) {
       Object obj = result.get(); 
       if(obj instanceof MetricsRecord) {
-        String znode = 
"/monitor/"+this.groomServerName+"/metrics/"+result.name();
+        String znode = 
+          MONITOR_ROOT_PATH+this.groomServerName+"/metrics/"+result.name();
         ZKUtil.create(zk, znode); // recursively create znode path
         MetricsRecord record = (MetricsRecord) obj;
+        int cnt = 0;
         for(Metric<? extends Number> metric: record.metrics()) {
+          cnt++;
           String name = metric.name();
           Number value = metric.value();
           try {
             // znode must exists so that child (znode/name) can be created.
             if(null != this.zk.exists(znode, false)) { 
-              if(LOG.isDebugEnabled())
-                LOG.debug("Name & value are going to be publish to zk -> 
["+name+"] ["+value+"]");
-              if(null == zk.exists(znode+File.separator+name, false)) {
-                String p = this.zk.create(znode+File.separator+name, 
toBytes(value), 
+              String suffix = suffix(value);;
+              if(LOG.isDebugEnabled()) {
+                LOG.debug("Publish name ["+name+"] and value ["+value+
+                "] to zk.");
+              }
+              final String zpath = znode+ZKUtil.ZK_SEPARATOR+name+suffix;
+              if(null == zk.exists(zpath, false)) {
+                String p = this.zk.create(zpath, toBytes(value), 
                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-                LOG.debug("Successfully publish data to zk with path to 
`"+p+"'");
+                LOG.debug("Publish data to zk with path to `"+p+"'");
               } else {
                 // can we just update by increasing 1 version?
-                this.zk.setData(znode+File.separator+name, toBytes(value), 
-1); 
-                LOG.debug("Successfully update data in znode: "+znode);
+                this.zk.setData(zpath, toBytes(value), -1); 
+                LOG.debug("Update data in znode: "+znode);
               }
             }
           } catch (KeeperException ke) {
@@ -225,21 +234,56 @@ public final class Monitor extends Threa
       }
     }
 
-    byte[] toBytes(Number value) {
-      if(value instanceof Double) {
-        return Bytes.toBytes(value.longValue());
+    String suffix(Number value) {
+      if(value instanceof Byte)  {
+        return "_b";
+      } else if(value instanceof Double) {
+        return "_d";
       } else if(value instanceof Float) {
-        return Bytes.toBytes(value.floatValue());
+        return "_f";
       } else if(value instanceof Integer) {
-        return Bytes.toBytes(value.intValue());
+        return "_i";
       } else if(value instanceof Long) {
-        return Bytes.toBytes(value.longValue());
-      } else if(value instanceof Short) {
-        return Bytes.toBytes(value.shortValue());
+        return "_l";
       } else {
-        LOG.warn("Unknown type for value:"+value);
-        return null;
+        return "_?";
+      } 
+    } 
+
+    byte[] toBytes(Number value) {
+      if(value instanceof Byte) { 
+        return new byte[] { value.byteValue() };
       }
+
+      byte[] bytes = null;
+      ByteArrayOutputStream dout = new ByteArrayOutputStream();
+      DataOutputStream output = new DataOutputStream(dout);
+      try{ 
+        if(value instanceof Double) {
+          output.writeDouble(value.doubleValue());
+        } else if(value instanceof Float) {
+          output.writeFloat(value.floatValue()); 
+        } else if(value instanceof Integer) {
+          output.writeInt(value.intValue()); 
+        } else if(value instanceof Short) {
+          output.writeShort(value.shortValue()); 
+        } else if(value instanceof Long) {
+          output.writeLong(value.longValue()); 
+        } else {
+          LOG.warn("Unkown data type: "+value);
+        }
+        bytes = dout.toByteArray();
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("bytes's length after value ("+value+") is converted: "+
+          ((null!=bytes)?bytes.length:0));
+        }
+      } catch(IOException ioe){
+        LOG.warn("Fail writing data to output stream.", ioe);
+      } finally { 
+        try { output.close(); } catch(IOException ioe) {
+        LOG.warn("Fail closing output stream.", ioe); } 
+      }
+      return bytes;
     }
 
   }
@@ -349,14 +393,16 @@ public final class Monitor extends Threa
         while(!Thread.currentThread().interrupted()) {
           Map<String, Task> tasks = 
             Configurator.configure((HamaConfiguration)this.conf, listener);
-          for(Map.Entry<String, Task> entry: tasks.entrySet()) {
-            String jarPath = entry.getKey();
-            Task t = entry.getValue();
-            TaskWorker old = (TaskWorker)
-              ((ConcurrentMap)this.workers).putIfAbsent(jarPath, new 
TaskWorker(t));
-            if(null != old) {
-              ((ConcurrentMap)this.workers).replace(jarPath, 
-              new TaskWorker(t));
+          if(null != tasks) {
+            for(Map.Entry<String, Task> entry: tasks.entrySet()) {
+              String jarPath = entry.getKey();
+              Task t = entry.getValue();
+              TaskWorker old = (TaskWorker)
+                ((ConcurrentMap)this.workers).putIfAbsent(jarPath, new 
TaskWorker(t));
+              if(null != old) {
+                ((ConcurrentMap)this.workers).replace(jarPath, 
+                new TaskWorker(t));
+              }
             }
           }
           LOG.debug("Task worker list's size: "+workers.size());

Added: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java?rev=1308383&view=auto
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java
 (added)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java
 Mon Apr  2 15:26:37 2012
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.monitor.Metric;
+import org.apache.hama.monitor.MetricsRecord;
+import org.apache.hama.monitor.Federator.Collector;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Fire a action to harvest metrics from ZooKeeper.
+ */
+public final class ZKCollector implements Collector {
+
+  public static final Log LOG = LogFactory.getLog(ZKCollector.class); 
+
+  private AtomicReference<Reference> reference = 
+    new AtomicReference<Reference>();
+
+  static final class Reference {
+    final ZooKeeper zk;
+    final MetricsRecord record; // name for this search 
+    final String path; // /path/to/metrics/parent/folder 
+    public Reference(ZooKeeper zk, MetricsRecord record, String path) {
+      this.zk = zk;
+      this.record = record;
+      this.path = path;
+    }
+  } 
+
+  /**
+   * ZKCollector havests metrics information from ZooKeeper. 
+   * @param zk is the target repository storing metrics.
+   * @param nameOfRecord is the name of MetricsRecord.
+   * @param descOfRecord is the description of MetricsRecord.
+   * @param path points to the <b>parent</b> directory of stored metrics. 
+   */
+  public ZKCollector(ZooKeeper zk, String nameOfRecord, String descOfRecord, 
+      String path) {
+    Reference ref = this.reference.get();
+    if(null == ref) {
+      this.reference.set(new Reference(zk, new MetricsRecord(nameOfRecord, 
+      descOfRecord), path));
+    }
+  }
+
+  @Override
+  public Object harvest() throws Exception {
+    final String path = this.reference.get().path;
+    final ZooKeeper zk = this.reference.get().zk;
+    LOG.debug("Searching "+path+" in zookeeper.");
+    Stat stat = zk.exists(path, false);
+    if(null == stat) return null; // no need to collect data.
+    List<String> children = zk.getChildren(path, false);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Leaves size is "+children.size()+" total znodes in list: "+
+      children);
+    }
+
+    // TODO: metrics record contains multiple metrics (1 to many)
+    // data is stored under zk e.g. /path/to/metrics/jvm/...
+    // within jvm folder metrics is stored in a form of name, value pair 
+    final MetricsRecord record = reference.get().record;
+    if(null != children) {
+      for(String child: children) {
+        LOG.info("metrics -> "+child);
+        // <metricsName_d> indicates data type is double  
+        String dataType = suffix(child); 
+        byte[] dataInBytes = zk.getData(path+"/"+child, false, stat);
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Data length (in byte): "+dataInBytes.length); 
+        }
+        String name = removeSuffix(child);
+        DataInputStream input = 
+          new DataInputStream(new ByteArrayInputStream(dataInBytes));
+        if("d".equals(dataType)) {
+          double dv = input.readDouble();
+          LOG.info("metrics "+name+" value:"+dv);
+          record.add(new Metric(name, dv)); 
+        } else if("f".equals(dataType)) {
+          float fv = input.readFloat();
+          LOG.info("metrics "+name+" value:"+fv);
+          record.add(new Metric(name, fv)); 
+        } else if("i".equals(dataType)) {
+          int iv = input.readInt();
+          LOG.info("metrics "+name+" value:"+iv);
+          record.add(new Metric(name, iv)); 
+        } else if("l".equals(dataType)) {
+          long lv = input.readLong();
+          LOG.info("metrics "+name+" value:"+lv);
+          record.add(new Metric(name, lv)); 
+        } else if("b".equals(dataType)) {
+          LOG.info("metrics"+name+" value:"+dataInBytes);
+          record.add(new Metric(name, dataInBytes)); 
+        } else {
+          LOG.warn("Unkown data type for metrics name: "+child);
+        }
+        try {} finally{ input.close();}
+      }
+    }
+    return record; 
+  }
+
+  private String removeSuffix(String path) {
+    return path.substring(0, path.length()-2);
+  }
+
+  private String suffix(String path) {
+    if(!"_".equals(path.substring(path.length()-2, path.length()-1))) {
+      return "?";
+    }
+    return path.substring(path.length()-1, path.length());
+  }   
+
+}

Modified: 
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1308383&r1=1308382&r2=1308383&view=diff
==============================================================================
--- 
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
 (original)
+++ 
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
 Mon Apr  2 15:26:37 2012
@@ -138,7 +138,6 @@ public class TestBSPMasterGroomServer ex
 
   /*
    * BEGIN: ZooKeeper tests.
-   */
 
   public void testClearZKNodes() throws IOException, KeeperException,
       InterruptedException {
@@ -194,6 +193,7 @@ public class TestBSPMasterGroomServer ex
       System.out.println("Node has been removed correctly!");
     }
   }
+   */
 
   /*
    * END: ZooKeeper tests.

Added: 
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1308383&view=auto
==============================================================================
--- 
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java 
(added)
+++ 
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java 
Mon Apr  2 15:26:37 2012
@@ -0,0 +1,129 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.ClassSerializePrinting;
+import org.apache.hama.zookeeper.QuorumPeer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class TestZooKeeper extends HamaCluster {
+
+  private static Log LOG = LogFactory.getLog(TestZooKeeper.class);
+
+  private HamaConfiguration configuration;
+
+  public TestZooKeeper() {
+    configuration = new HamaConfiguration();
+    configuration.set("bsp.master.address", "localhost");
+    assertEquals("Make sure master addr is set to localhost:", "localhost",
+        configuration.get("bsp.master.address"));
+    configuration.set("bsp.local.dir", "/tmp/hama-test");
+    configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+    configuration.set("hama.sync.client.class",
+        org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+            .getCanonicalName());
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  public void testClearZKNodes() throws IOException, KeeperException,
+      InterruptedException {
+
+    // Clear any existing znode with the same path as bspRoot.
+    bspCluster.getBSPMaster().clearZKNodes();
+
+    int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT,
+        6000);
+    String connectStr = QuorumPeer.getZKQuorumServersString(configuration);
+    String bspRoot = configuration.get(Constants.ZOOKEEPER_ROOT,
+        Constants.DEFAULT_ZOOKEEPER_ROOT);
+
+    // Establishing a zk session.
+    ZooKeeper zk = new ZooKeeper(connectStr, timeout, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        // Do nothing.(Dummy Watcher)
+      }
+    });
+
+    // Creating dummy bspRoot if it doesn't already exist.
+    Stat s = zk.exists(bspRoot, false);
+    if (s == null) {
+      zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT);
+    }
+
+    // Creating dummy child nodes at depth 1.
+    String node1 = bspRoot + "/task1";
+    String node2 = bspRoot + "/task2";
+    zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+    // Creating dummy child node at depth 2.
+    String node11 = node1 + "/superstep1";
+    zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+    ArrayList<String> list = (ArrayList<String>) zk.getChildren(bspRoot, 
false);
+    assertEquals(2, list.size());
+    System.out.println(list.size());
+
+    bspCluster.getBSPMaster().clearZKNodes();
+
+    list = (ArrayList<String>) zk.getChildren(bspRoot, false);
+    System.out.println(list.size());
+    assertEquals(0, list.size());
+
+    try {
+      zk.getData(node11, false, null);
+      fail();
+    } catch (KeeperException.NoNodeException e) {
+      System.out.println("Node has been removed correctly!");
+    }
+  }
+
+}

Added: 
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java?rev=1308383&view=auto
==============================================================================
--- 
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java
 (added)
+++ 
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java
 Mon Apr  2 15:26:37 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import static org.junit.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.monitor.Monitor.Task;
+
+public class TestConfigurator extends TestCase {
+
+  public static final Log LOG = LogFactory.getLog(TestConfigurator.class);
+
+  /**
+   * If test fails, please check if `plugins' dir exists under "user.dir".
+   */
+  public void testPluginDirNotPresented() throws Exception {
+    System.setProperty("hama.home.dir", System.getProperty("user.dir"));
+    Map<String, Task> tasks = Configurator.configure(new HamaConfiguration(), 
null);    
+    LOG.info("Plugins dir is not created, returned tasks should be null -> 
"+tasks);
+    assertNull("Tasks returned should be null because no plugins dir is 
created.", tasks);
+  }
+
+  
+}

Added: 
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java?rev=1308383&view=auto
==============================================================================
--- 
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java
 (added)
+++ 
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java
 Mon Apr  2 15:26:37 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import junit.framework.TestCase;
+
+import static org.junit.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.monitor.Federator.Act;
+import org.apache.hama.monitor.Federator.Collector;
+import org.apache.hama.monitor.Federator.CollectorHandler;
+
+public class TestFederator extends TestCase {
+
+  public static final Log LOG = LogFactory.getLog(TestFederator.class);
+
+  Federator federator; 
+
+  final static int expected = 10;
+
+  public static final class DummyCollector implements Collector {
+
+    final AtomicInteger sum = new AtomicInteger(0);
+
+    public DummyCollector(int value) {
+      sum.set(value);
+    } 
+
+    public Object harvest() throws Exception {
+      assertEquals("Test if value is equal before harvest.", expected, 
sum.get()); 
+      int result = sum.incrementAndGet(); 
+      Thread.sleep(2*1000); // simulate task execution which takes time.
+      assertEquals("Test if value is equal after harvest.", (expected+1), 
result); 
+      return result;
+    }
+
+  }
+
+  public void setUp() throws Exception {
+    this.federator = new Federator(new HamaConfiguration());
+    this.federator.start();
+  }
+
+  public void testExecutionFlow() throws Exception {
+    LOG.info("Value before submitted: "+expected);
+    final AtomicInteger finalResult = new AtomicInteger(0);
+    final Act act = new Act(new DummyCollector(expected), new 
CollectorHandler() {
+      public void handle(Future future) {
+        try {
+          finalResult.set(((Integer)future.get()).intValue());
+          LOG.info("Value after submitted: "+finalResult);
+        } catch (ExecutionException ee) {
+          LOG.error(ee);
+        } catch (InterruptedException ie) {
+          LOG.error(ie);
+          Thread.currentThread().interrupt();
+        }
+      }
+    });
+    this.federator.register(act);
+    Thread.sleep(3*1000);
+    assertEquals("Result should be "+(expected+1)+".", finalResult.get(), 
(expected+1)); 
+  }
+  
+  public void tearDown() throws Exception {
+    this.federator.interrupt();
+  }
+  
+}


Reply via email to