Author: chl501
Date: Sun Apr  8 12:42:55 2012
New Revision: 1310987

URL: http://svn.apache.org/viewvc?rev=1310987&view=rev
Log:
[HAMA-370] Failure detector for Hama

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/
    
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java
Removed:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/metrics/
    incubator/hama/trunk/core/src/test/java/org/apache/hama/metrics/

Added: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java?rev=1310987&view=auto
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java
 (added)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java
 Sun Apr  8 12:42:55 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor.fd;
+
+/**
+ * A component used by an application to query the target staus.
+ */
+public interface Interpreter {
+
+  /**
+   * An output value represents the level of a node's status. 
+   * @param address to be checked.
+   * @return true indicates the target node is alive; false otherwise.
+   */
+  boolean isAlive(String address);
+
+}

Added: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java?rev=1310987&view=auto
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java 
(added)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java 
Sun Apr  8 12:42:55 2012
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor.fd;
+
+import java.util.ArrayDeque;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math.distribution.NormalDistribution;
+import org.apache.commons.math.distribution.NormalDistributionImpl;
+import org.apache.commons.math.MathException;
+
+public final class Node{ 
+
+  public static final Log LOG = LogFactory.getLog(Node.class);
+
+  private final String address;
+  /**
+   * Sliding window that stores inter-arrival time. For instance,
+   * T={10, 12, 14, 17, 23, 25}  Tinter-arrival={2, 2, 3, 6, 2}
+   */
+  private final ArrayDeque<Double> samplingWindow; // fix size ws
+  private final int windowSize;
+  /* The latest heartbeat */
+  private final AtomicLong latestHeartbeat = new AtomicLong(0);
+
+  public Node(String address, int size){
+    this.address = address;
+    this.windowSize = size;
+    this.samplingWindow = new ArrayDeque<Double>(windowSize);
+    if(null == this.address) 
+      throw new NullPointerException("Address is not provided");
+  }
+
+  public String getAddress(){
+    return this.address;
+  }
+
+  void setLatestHeartbeat(long latestHeartbeat){
+    this.latestHeartbeat.set(latestHeartbeat);
+  }
+
+  public long getLatestHeartbeat(){
+    return this.latestHeartbeat.get();
+  }
+
+  public synchronized void reset(){
+    this.samplingWindow.clear();
+    setLatestHeartbeat(0); 
+  }
+
+  /**
+   * The size used for storing samples. 
+   * @return int value fixed without change over time.
+   */
+  public int windowSize(){
+    return windowSize;
+  }
+
+  /**
+   * Inter-arrival times data as array.
+   * @return Double array format.
+   */
+  public Double[] samples(){
+    return (Double[]) samplingWindow.toArray(
+           new Double[samplingWindow.size()]);
+  }
+
+  /**
+   * Store the latest inter-arrival time to sampling window.
+   * The head of the array will be dicarded. Newly received heartbeat
+   * is added to the tail of the sliding window.
+   * @param heartbeat value is the current timestamp the client .
+   */
+  public void add(long heartbeat){
+    if(null == this.samplingWindow)
+      throw new NullPointerException("Sampling windows not exist.");
+    synchronized(this.samplingWindow){
+      if(0 != getLatestHeartbeat()) {
+        if(samplingWindow.size() == this.windowSize){
+          samplingWindow.remove();
+        }
+        samplingWindow.add(new Double(heartbeat-getLatestHeartbeat()));
+      }
+      setLatestHeartbeat(heartbeat);
+    }
+  }
+
+  /**
+   * Calculate cumulative distribution function value according to 
+   * the current timestamp, heartbeat in sampling window, and last heartbeat. 
+   * @param timestamp is the current timestamp.
+   * @param samples contain inter-arrival time in the sampling window. 
+   * @return double value as cdf, which stays between 0.0 and 1.0. 
+   */
+  public double cdf(long timestamp, Double[] samples){
+    double cdf = -1d;
+    double mean = mean(samples);
+    double variance = variance(samples);
+    NormalDistribution cal = new NormalDistributionImpl(mean, variance);
+    try{
+         cdf = cal.cumulativeProbability(
+           ((double)timestamp-(double)getLatestHeartbeat()));
+    }catch(MathException me){
+       LOG.error("Fail to compute phi value.", me);
+    }
+    if(LOG.isDebugEnabled()) LOG.debug("Calcuated cdf:"+cdf);
+    return cdf;
+  }
+
+  /**
+   * Ouput phi value.  
+   * @param now is the current timestamp.
+   * @return phi value, which goes infinity when cdf returns 1, and 
+   *         stays -0.0 when cdf is 0. 
+   */
+  public double phi(long now){ 
+    return (-1) * Math.log10(1-cdf(now, this.samples()));
+  }
+
+  /**
+   * Mean of the samples.
+   * @return double value for mean of the samples.
+   */
+  public double mean(Double[] samples){
+    int len = samples.length;
+    if(0 >= len) 
+      throw new RuntimeException("Samples data does not exist.");
+    double sum = 0d;
+    for(double sample: samples){
+      sum += sample;
+    }
+    return sum/(double)len;
+  }
+
+  /**
+   * Standard deviation.
+   * @return double value of standard deviation.
+   */
+  public double variance(Double[] samples){
+    int len = samples.length;
+    double mean = mean(samples);
+    double sumd = 0d;
+    for(double sample: samples)  {
+      double v =  sample - mean;
+      sumd += v*v;
+    }
+    return sumd/(double)len;
+  }
+
+  @Override
+  public boolean equals(final Object target){
+    if (target == this)
+      return true;
+    if (null == target)
+      return false;
+    if (getClass() != target.getClass())
+      return false;
+
+    Node n = (Node) target;
+    if(!getAddress().equals(n.address))
+      return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode(){
+    int result = 17;
+    result = 37 * result + address.hashCode();
+    return result;
+  }
+
+  @Override
+  public String toString(){
+    Double[] samples = samples();
+    StringBuilder builder = new StringBuilder();
+    for(double d: samples){
+      builder.append(" "+d+" ");
+    }
+    return "Node address:"+this.address+" mean:"+mean(samples)+" variance:"+
+           variance(samples)+" samples:["+builder.toString()+"]";
+  }
+}

Added: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java?rev=1310987&view=auto
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java 
(added)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java 
Sun Apr  8 12:42:55 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.monitor.fd;
+
+import java.io.IOException;
+
+/**
+ * Failure detector client, sending heartbeat to supervisor. 
+ */
+public interface Sensor {
+
+  /**
+   * The heartbeat function, signifying its existence.
+   */
+  void heartbeat() throws IOException;
+
+}

Added: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java?rev=1310987&view=auto
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java
 (added)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java
 Sun Apr  8 12:42:55 2012
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor.fd;
+
+/**
+ * A simple binary interpreter translates phi value to application
+ * for checking if a specific groom server is alive.
+ */
+public class SimpleBinaryInterpreter implements Interpreter{
+
+  private final Supervisor supervisor;
+
+  public SimpleBinaryInterpreter(Supervisor supervisor){
+    this.supervisor = supervisor;
+    if(null == this.supervisor) 
+      throw new NullPointerException("Supervisor is not provided.");
+  }
+
+  /**
+   * An output value represents the level of a node's status. 
+   * @param address to be checked.
+   * @return true indicates the target node is alive; false otherwise.
+   */
+  public boolean isAlive(String address){
+    if(Double.isInfinite(this.supervisor.suspicionLevel(address))){
+      return false;
+    }
+    return true;
+  }
+}

Added: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java?rev=1310987&view=auto
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java
 (added)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java
 Sun Apr  8 12:42:55 2012
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.monitor.fd;
+
+/**
+ * A failure detector component. It is responsible for receiving the 
+ * heartbeat and output suspicion level for Interpreter.
+ */
+public interface Supervisor {
+
+  /**
+   * The output value represents the level of a node's status. 
+   * @param addr to be checked.
+   * @return double value as the suspicion level of the endpoint.
+   */
+  double suspicionLevel(String addr);
+
+}

Added: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java?rev=1310987&view=auto
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java
 (added)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java
 Sun Apr  8 12:42:55 2012
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.monitor.fd;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.HamaConfiguration;
+
+/**
+ * Failure detector UDP client.
+ */
+public class UDPSensor implements Sensor, Runnable{
+
+  public static final Log LOG = LogFactory.getLog(UDPSensor.class);
+  /** 
+   * The default interval hearbeat.
+   */
+  private static long HEARTBEAT_INTERVAL; 
+
+  /* UDP server address and port */
+  private String address;
+  private int port;
+  private DatagramChannel channel;
+  private AtomicBoolean running = new AtomicBoolean(false);
+  private AtomicLong sequence = new AtomicLong(0);
+
+  /**
+   * Constructor for UDP client. Setting up configuration 
+   * and open DatagramSocket.
+   */
+  public UDPSensor(Configuration configuration){
+    this.address = 
+      ((HamaConfiguration)configuration).get("bsp.monitor.fd.udp_address", 
"localhost");
+    this.port = 
+      ((HamaConfiguration)configuration).getInt("bsp.monitor.fd.udp_port", 
16384);
+    HEARTBEAT_INTERVAL = ((HamaConfiguration)configuration).getInt(
+      "bsp.monitor.fd.heartbeat_interval", 100);
+    running.set(true);
+    try{
+      channel = DatagramChannel.open();
+    }catch(IOException ioe){
+      LOG.error("Fail to initialize udp channel.", ioe);
+    }
+  } 
+
+
+  /**
+   * The heartbeat function, signifying its existence.
+   */
+  public void heartbeat() throws IOException{
+    ByteBuffer heartbeat = ByteBuffer.allocate(8);
+    heartbeat.clear();
+    heartbeat.putLong(sequence.incrementAndGet()); 
+    heartbeat.flip();
+    channel.send(heartbeat, new InetSocketAddress(InetAddress.getByName(
+      this.address), this.port));
+    if(LOG.isDebugEnabled()){
+      LOG.debug("Heartbeat sequence "+sequence.get()+ " is sent to 
"+this.address+":"+ this.port);
+    }
+  }
+
+  public String getAddress(){
+    return this.address;
+  }
+  
+  public int getPort(){
+    return this.port;
+  }
+
+  public long heartbeatInterval(){
+    return HEARTBEAT_INTERVAL;
+  }
+
+  public void run(){
+    while(running.get()){
+      try{
+        heartbeat();
+        Thread.sleep(HEARTBEAT_INTERVAL);
+      }catch(InterruptedException ie){
+        LOG.error("UDPSensor is interrupted.", ie);
+        Thread.currentThread().interrupt();
+      }catch(IOException ioe){ 
+        LOG.error("Sensor fails in sending heartbeat.", ioe);
+      }
+    }
+    LOG.info("Sensor at "+this.address+" stops sending heartbeat.");
+  }
+
+  public void shutdown(){
+    running.set(false);
+    if(null != this.channel) {
+      try{ 
+        this.channel.socket().close();
+        this.channel.close(); 
+      }catch(IOException ioe){ 
+        LOG.error("Error closing sensor channel.",ioe); 
+      }
+    }
+  }
+
+
+  public boolean isShutdown(){
+    return this.channel.socket().isClosed() && !running.get();
+  }
+
+}

Added: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java?rev=1310987&view=auto
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java
 (added)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java
 Sun Apr  8 12:42:55 2012
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor.fd;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean; 
+import java.util.concurrent.Callable; 
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import static java.util.concurrent.TimeUnit.*; 
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * UDP supervisor is responsible for receiving the 
+ * heartbeat and output suspicion level for Interpreter.
+ */
+public class UDPSupervisor implements Supervisor, Runnable{
+
+  public static final Log LOG = LogFactory.getLog(UDPSupervisor.class);
+
+  private static int WINDOW_SIZE = 100;
+  private static final List<Node> nodes = new CopyOnWriteArrayList<Node>();
+  private final ScheduledExecutorService sched;
+  private final DatagramChannel channel;
+  private AtomicBoolean running = new AtomicBoolean(false);
+
+  public static class Hermes implements Callable{
+    private final Node node;
+    private final long heartbeat;
+    private final long sequence;
+
+    /**
+     * Logic unit deals with sensors' status. 
+     * It first check if the packet coming is 1, indicating the arrival 
+     * of new packet. Then it checkes the node position within the list, 
+     * i.e., nodes. If -1 returns, a completely refresh packet arrives; 
+     * therefore adding node info to the nodes list; otherwise reseting 
+     * the node sampling window and the latest heartbeat value.
+     * If the packet comes with the sequence other than 1, meaning its
+     * status of continuous sending heartbeat, thus retrieve old node
+     * from list and process necessary steps, such as manipulating sample 
+     * windows and assigning the last heartbeat.
+     * @param address of specific node equipted with sensor.
+     * @param sequence number is generated by the sensor.
+     * @param heartbeat timestamped by the supervisor.
+     */
+    public Hermes(String address, final long sequence, final long heartbeat){
+      Node n = new Node(address, WINDOW_SIZE);
+      int p = nodes.indexOf(n);
+      Node tmp = null;
+      if(1L == sequence) {
+        if(-1 == p){// fresh
+          tmp = n;
+          nodes.add(tmp);
+        }else{// node crashed then restarted
+          tmp = nodes.get(p);
+          tmp.reset();
+        }
+      }else{
+        if(-1 == p){
+          LOG.warn("Non existing host ("+address+") is sending heartbeat"+
+          " sequence "+sequence+"!!!");
+        }else{
+          tmp = nodes.get(p);
+        }
+      }
+      this.node = tmp;
+      this.heartbeat = heartbeat;
+      this.sequence = sequence;
+    }
+
+    @Override
+    public Object call() throws Exception {
+      this.node.add(this.heartbeat);
+      return null;
+    }
+  }
+
+  /**
+   * UDP Supervisor.
+   */
+  public UDPSupervisor(Configuration conf){
+    DatagramChannel ch = null;
+    try{
+      ch = DatagramChannel.open();
+    }catch(IOException ioe){
+      LOG.error("Fail to open udp channel.", ioe);
+    }
+    this.channel = ch;
+    if(null == this.channel) throw new NullPointerException();
+    try{
+      this.channel.socket().bind((SocketAddress)new InetSocketAddress(
+        conf.getInt("bsp.monitor.fd.udp_port", 16384)));
+    }catch(SocketException se){
+      LOG.error("Unable to bind the udp socket.", se);
+    }
+    WINDOW_SIZE = conf.getInt("bsp.monitor.fd.window_size", 100);
+    sched = Executors.newScheduledThreadPool(conf.
+      getInt("bsp.monitor.fd.supervisor_threads", 20));
+  }
+
+  /**
+   * The output value represents the level of a node's status, 
+   * Normally called by Interpretor.
+   * @param addr to be checked. 
+   * @return double value as the suspicion level of the endpoint.
+   *         -1d indicates not found.
+   */
+  @Override
+  public double suspicionLevel(String addr) {
+    if(null == addr || "".equals(addr))
+      throw new NullPointerException("Target address is not provided.");
+    for(Node n: nodes){
+      if(addr.equals(n.getAddress())) {
+         return n.phi(System.currentTimeMillis());
+      }
+    }
+    return -1d;
+  }
+
+  public void run(){
+    ByteBuffer packet = ByteBuffer.allocate(8);
+    try{
+      running.set(true);
+      while(running.get()){
+        SocketAddress source = (InetSocketAddress)channel.receive(packet); 
+        packet.flip();
+        long seq = packet.getLong();
+        packet.clear();
+        if(LOG.isDebugEnabled()){
+          LOG.debug("seqence: "+seq+" src address: "+
+          ((InetSocketAddress)source).getAddress().getHostAddress());
+        }
+        sched.schedule(new Hermes(((InetSocketAddress)source).getAddress()
+          .getHostAddress(), seq, System.currentTimeMillis()), 0, SECONDS); 
+      }
+    }catch(IOException ioe){
+      LOG.error("Problem in receiving packet from channel.", ioe);
+      Thread.currentThread().interrupt();
+    }finally{
+      if(null != this.channel) 
+        try{ 
+          this.channel.socket().close();
+          this.channel.close(); 
+        }catch(IOException ioe){ 
+          LOG.error("Error closing supervisor channel.",ioe); 
+        }
+    }
+  }  
+
+  public void shutdown(){
+    running.set(false);
+    sched.shutdown();
+  }
+
+  public boolean isShutdown(){
+    return this.channel.socket().isClosed() && !running.get();
+  }
+}

Added: 
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java?rev=1310987&view=auto
==============================================================================
--- 
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java 
(added)
+++ 
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java 
Sun Apr  8 12:42:55 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.monitor.fd;
+
+import static java.util.concurrent.TimeUnit.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+
+/**
+ * Test case for Phi accrual fail detector. 
+ */
+public class TestFD extends HamaCluster {
+  public static final Log LOG = LogFactory.getLog(TestFD.class);
+  final HamaConfiguration conf;
+  final ScheduledExecutorService sched;
+
+  public TestFD() {
+    this.conf = getConf();
+    this.sched = Executors.newScheduledThreadPool(10);
+  }
+
+  public void setUp() throws Exception { }
+
+  /**
+   * Test Phi Accrual Fialure Detector.
+   */
+  public void testCumulativeDistributedFunction() throws Exception {
+    this.conf.setInt("bsp.monitor.fd.udp_port", 9765);
+    UDPSupervisor server = new UDPSupervisor(this.conf);
+    UDPSensor client = new UDPSensor(this.conf); 
+    this.sched.schedule(server, 0, SECONDS);
+    this.sched.schedule(client, 2, SECONDS);
+    boolean flag = true;
+    int count = 0;
+    while(flag){
+      count++;
+      Thread.sleep(1000*3);
+      double phi = server.suspicionLevel("127.0.0.1");
+      if(LOG.isDebugEnabled())
+        LOG.debug("Phi value:"+phi+" 
Double.isInfinite(phi):"+Double.isInfinite(phi));
+      assertTrue("In normal case phi should not go infinity!", 
!Double.isInfinite(phi));
+      if(10 < count){
+        flag = false;
+      }
+    }
+    client.shutdown();
+    server.shutdown();
+    LOG.info("Finished testing suspicion level.");
+  }
+
+  /**
+   * Test when sensor fails.
+   */
+  public void testSensorFailure() throws Exception{
+    this.conf.setInt("bsp.monitor.fd.udp_port", 2874);
+    UDPSupervisor server = new UDPSupervisor(this.conf);
+    UDPSensor client = new UDPSensor(this.conf); 
+    this.sched.schedule(server, 0, SECONDS);
+    this.sched.schedule(client, 2, SECONDS);
+    int count = 0;
+    boolean flag = true;
+    while(flag){
+      count++;
+      double phi = server.suspicionLevel("127.0.0.1");
+      Thread.sleep(1000*3);
+      if(5 < count){
+        client.shutdown(); 
+        Thread.sleep(1000*4);
+        phi = server.suspicionLevel("127.0.0.1");
+        if(LOG.isDebugEnabled())
+          LOG.debug("Phi value should go infinity:"+Double.isInfinite(phi));
+        assertTrue("In normal case phi should not go infinity!", 
Double.isInfinite(phi));
+      }
+      if(10 < count){
+        flag = false;
+      }
+    }
+    server.shutdown();
+    LOG.info("Finished testing client failure case.");
+  }
+  
+  public void tearDown() throws Exception { 
+    sched.shutdown();
+  }
+}


Reply via email to