Author: chl501
Date: Sun Apr 8 12:42:55 2012
New Revision: 1310987
URL: http://svn.apache.org/viewvc?rev=1310987&view=rev
Log:
[HAMA-370] Failure detector for Hama
Added:
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java
Removed:
incubator/hama/trunk/core/src/main/java/org/apache/hama/metrics/
incubator/hama/trunk/core/src/test/java/org/apache/hama/metrics/
Added:
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java?rev=1310987&view=auto
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java
(added)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java
Sun Apr 8 12:42:55 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor.fd;
+
+/**
+ * A component used by an application to query the target staus.
+ */
+public interface Interpreter {
+
+ /**
+ * An output value represents the level of a node's status.
+ * @param address to be checked.
+ * @return true indicates the target node is alive; false otherwise.
+ */
+ boolean isAlive(String address);
+
+}
Added:
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java?rev=1310987&view=auto
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java
(added)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java
Sun Apr 8 12:42:55 2012
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor.fd;
+
+import java.util.ArrayDeque;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math.distribution.NormalDistribution;
+import org.apache.commons.math.distribution.NormalDistributionImpl;
+import org.apache.commons.math.MathException;
+
+public final class Node{
+
+ public static final Log LOG = LogFactory.getLog(Node.class);
+
+ private final String address;
+ /**
+ * Sliding window that stores inter-arrival time. For instance,
+ * T={10, 12, 14, 17, 23, 25} Tinter-arrival={2, 2, 3, 6, 2}
+ */
+ private final ArrayDeque<Double> samplingWindow; // fix size ws
+ private final int windowSize;
+ /* The latest heartbeat */
+ private final AtomicLong latestHeartbeat = new AtomicLong(0);
+
+ public Node(String address, int size){
+ this.address = address;
+ this.windowSize = size;
+ this.samplingWindow = new ArrayDeque<Double>(windowSize);
+ if(null == this.address)
+ throw new NullPointerException("Address is not provided");
+ }
+
+ public String getAddress(){
+ return this.address;
+ }
+
+ void setLatestHeartbeat(long latestHeartbeat){
+ this.latestHeartbeat.set(latestHeartbeat);
+ }
+
+ public long getLatestHeartbeat(){
+ return this.latestHeartbeat.get();
+ }
+
+ public synchronized void reset(){
+ this.samplingWindow.clear();
+ setLatestHeartbeat(0);
+ }
+
+ /**
+ * The size used for storing samples.
+ * @return int value fixed without change over time.
+ */
+ public int windowSize(){
+ return windowSize;
+ }
+
+ /**
+ * Inter-arrival times data as array.
+ * @return Double array format.
+ */
+ public Double[] samples(){
+ return (Double[]) samplingWindow.toArray(
+ new Double[samplingWindow.size()]);
+ }
+
+ /**
+ * Store the latest inter-arrival time to sampling window.
+ * The head of the array will be dicarded. Newly received heartbeat
+ * is added to the tail of the sliding window.
+ * @param heartbeat value is the current timestamp the client .
+ */
+ public void add(long heartbeat){
+ if(null == this.samplingWindow)
+ throw new NullPointerException("Sampling windows not exist.");
+ synchronized(this.samplingWindow){
+ if(0 != getLatestHeartbeat()) {
+ if(samplingWindow.size() == this.windowSize){
+ samplingWindow.remove();
+ }
+ samplingWindow.add(new Double(heartbeat-getLatestHeartbeat()));
+ }
+ setLatestHeartbeat(heartbeat);
+ }
+ }
+
+ /**
+ * Calculate cumulative distribution function value according to
+ * the current timestamp, heartbeat in sampling window, and last heartbeat.
+ * @param timestamp is the current timestamp.
+ * @param samples contain inter-arrival time in the sampling window.
+ * @return double value as cdf, which stays between 0.0 and 1.0.
+ */
+ public double cdf(long timestamp, Double[] samples){
+ double cdf = -1d;
+ double mean = mean(samples);
+ double variance = variance(samples);
+ NormalDistribution cal = new NormalDistributionImpl(mean, variance);
+ try{
+ cdf = cal.cumulativeProbability(
+ ((double)timestamp-(double)getLatestHeartbeat()));
+ }catch(MathException me){
+ LOG.error("Fail to compute phi value.", me);
+ }
+ if(LOG.isDebugEnabled()) LOG.debug("Calcuated cdf:"+cdf);
+ return cdf;
+ }
+
+ /**
+ * Ouput phi value.
+ * @param now is the current timestamp.
+ * @return phi value, which goes infinity when cdf returns 1, and
+ * stays -0.0 when cdf is 0.
+ */
+ public double phi(long now){
+ return (-1) * Math.log10(1-cdf(now, this.samples()));
+ }
+
+ /**
+ * Mean of the samples.
+ * @return double value for mean of the samples.
+ */
+ public double mean(Double[] samples){
+ int len = samples.length;
+ if(0 >= len)
+ throw new RuntimeException("Samples data does not exist.");
+ double sum = 0d;
+ for(double sample: samples){
+ sum += sample;
+ }
+ return sum/(double)len;
+ }
+
+ /**
+ * Standard deviation.
+ * @return double value of standard deviation.
+ */
+ public double variance(Double[] samples){
+ int len = samples.length;
+ double mean = mean(samples);
+ double sumd = 0d;
+ for(double sample: samples) {
+ double v = sample - mean;
+ sumd += v*v;
+ }
+ return sumd/(double)len;
+ }
+
+ @Override
+ public boolean equals(final Object target){
+ if (target == this)
+ return true;
+ if (null == target)
+ return false;
+ if (getClass() != target.getClass())
+ return false;
+
+ Node n = (Node) target;
+ if(!getAddress().equals(n.address))
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode(){
+ int result = 17;
+ result = 37 * result + address.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString(){
+ Double[] samples = samples();
+ StringBuilder builder = new StringBuilder();
+ for(double d: samples){
+ builder.append(" "+d+" ");
+ }
+ return "Node address:"+this.address+" mean:"+mean(samples)+" variance:"+
+ variance(samples)+" samples:["+builder.toString()+"]";
+ }
+}
Added:
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java?rev=1310987&view=auto
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java
(added)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java
Sun Apr 8 12:42:55 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.monitor.fd;
+
+import java.io.IOException;
+
+/**
+ * Failure detector client, sending heartbeat to supervisor.
+ */
+public interface Sensor {
+
+ /**
+ * The heartbeat function, signifying its existence.
+ */
+ void heartbeat() throws IOException;
+
+}
Added:
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java?rev=1310987&view=auto
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java
(added)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java
Sun Apr 8 12:42:55 2012
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor.fd;
+
+/**
+ * A simple binary interpreter translates phi value to application
+ * for checking if a specific groom server is alive.
+ */
+public class SimpleBinaryInterpreter implements Interpreter{
+
+ private final Supervisor supervisor;
+
+ public SimpleBinaryInterpreter(Supervisor supervisor){
+ this.supervisor = supervisor;
+ if(null == this.supervisor)
+ throw new NullPointerException("Supervisor is not provided.");
+ }
+
+ /**
+ * An output value represents the level of a node's status.
+ * @param address to be checked.
+ * @return true indicates the target node is alive; false otherwise.
+ */
+ public boolean isAlive(String address){
+ if(Double.isInfinite(this.supervisor.suspicionLevel(address))){
+ return false;
+ }
+ return true;
+ }
+}
Added:
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java?rev=1310987&view=auto
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java
(added)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java
Sun Apr 8 12:42:55 2012
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.monitor.fd;
+
+/**
+ * A failure detector component. It is responsible for receiving the
+ * heartbeat and output suspicion level for Interpreter.
+ */
+public interface Supervisor {
+
+ /**
+ * The output value represents the level of a node's status.
+ * @param addr to be checked.
+ * @return double value as the suspicion level of the endpoint.
+ */
+ double suspicionLevel(String addr);
+
+}
Added:
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java?rev=1310987&view=auto
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java
(added)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java
Sun Apr 8 12:42:55 2012
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.monitor.fd;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.HamaConfiguration;
+
+/**
+ * Failure detector UDP client.
+ */
+public class UDPSensor implements Sensor, Runnable{
+
+ public static final Log LOG = LogFactory.getLog(UDPSensor.class);
+ /**
+ * The default interval hearbeat.
+ */
+ private static long HEARTBEAT_INTERVAL;
+
+ /* UDP server address and port */
+ private String address;
+ private int port;
+ private DatagramChannel channel;
+ private AtomicBoolean running = new AtomicBoolean(false);
+ private AtomicLong sequence = new AtomicLong(0);
+
+ /**
+ * Constructor for UDP client. Setting up configuration
+ * and open DatagramSocket.
+ */
+ public UDPSensor(Configuration configuration){
+ this.address =
+ ((HamaConfiguration)configuration).get("bsp.monitor.fd.udp_address",
"localhost");
+ this.port =
+ ((HamaConfiguration)configuration).getInt("bsp.monitor.fd.udp_port",
16384);
+ HEARTBEAT_INTERVAL = ((HamaConfiguration)configuration).getInt(
+ "bsp.monitor.fd.heartbeat_interval", 100);
+ running.set(true);
+ try{
+ channel = DatagramChannel.open();
+ }catch(IOException ioe){
+ LOG.error("Fail to initialize udp channel.", ioe);
+ }
+ }
+
+
+ /**
+ * The heartbeat function, signifying its existence.
+ */
+ public void heartbeat() throws IOException{
+ ByteBuffer heartbeat = ByteBuffer.allocate(8);
+ heartbeat.clear();
+ heartbeat.putLong(sequence.incrementAndGet());
+ heartbeat.flip();
+ channel.send(heartbeat, new InetSocketAddress(InetAddress.getByName(
+ this.address), this.port));
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Heartbeat sequence "+sequence.get()+ " is sent to
"+this.address+":"+ this.port);
+ }
+ }
+
+ public String getAddress(){
+ return this.address;
+ }
+
+ public int getPort(){
+ return this.port;
+ }
+
+ public long heartbeatInterval(){
+ return HEARTBEAT_INTERVAL;
+ }
+
+ public void run(){
+ while(running.get()){
+ try{
+ heartbeat();
+ Thread.sleep(HEARTBEAT_INTERVAL);
+ }catch(InterruptedException ie){
+ LOG.error("UDPSensor is interrupted.", ie);
+ Thread.currentThread().interrupt();
+ }catch(IOException ioe){
+ LOG.error("Sensor fails in sending heartbeat.", ioe);
+ }
+ }
+ LOG.info("Sensor at "+this.address+" stops sending heartbeat.");
+ }
+
+ public void shutdown(){
+ running.set(false);
+ if(null != this.channel) {
+ try{
+ this.channel.socket().close();
+ this.channel.close();
+ }catch(IOException ioe){
+ LOG.error("Error closing sensor channel.",ioe);
+ }
+ }
+ }
+
+
+ public boolean isShutdown(){
+ return this.channel.socket().isClosed() && !running.get();
+ }
+
+}
Added:
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java?rev=1310987&view=auto
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java
(added)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java
Sun Apr 8 12:42:55 2012
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.monitor.fd;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import static java.util.concurrent.TimeUnit.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * UDP supervisor is responsible for receiving the
+ * heartbeat and output suspicion level for Interpreter.
+ */
+public class UDPSupervisor implements Supervisor, Runnable{
+
+ public static final Log LOG = LogFactory.getLog(UDPSupervisor.class);
+
+ private static int WINDOW_SIZE = 100;
+ private static final List<Node> nodes = new CopyOnWriteArrayList<Node>();
+ private final ScheduledExecutorService sched;
+ private final DatagramChannel channel;
+ private AtomicBoolean running = new AtomicBoolean(false);
+
+ public static class Hermes implements Callable{
+ private final Node node;
+ private final long heartbeat;
+ private final long sequence;
+
+ /**
+ * Logic unit deals with sensors' status.
+ * It first check if the packet coming is 1, indicating the arrival
+ * of new packet. Then it checkes the node position within the list,
+ * i.e., nodes. If -1 returns, a completely refresh packet arrives;
+ * therefore adding node info to the nodes list; otherwise reseting
+ * the node sampling window and the latest heartbeat value.
+ * If the packet comes with the sequence other than 1, meaning its
+ * status of continuous sending heartbeat, thus retrieve old node
+ * from list and process necessary steps, such as manipulating sample
+ * windows and assigning the last heartbeat.
+ * @param address of specific node equipted with sensor.
+ * @param sequence number is generated by the sensor.
+ * @param heartbeat timestamped by the supervisor.
+ */
+ public Hermes(String address, final long sequence, final long heartbeat){
+ Node n = new Node(address, WINDOW_SIZE);
+ int p = nodes.indexOf(n);
+ Node tmp = null;
+ if(1L == sequence) {
+ if(-1 == p){// fresh
+ tmp = n;
+ nodes.add(tmp);
+ }else{// node crashed then restarted
+ tmp = nodes.get(p);
+ tmp.reset();
+ }
+ }else{
+ if(-1 == p){
+ LOG.warn("Non existing host ("+address+") is sending heartbeat"+
+ " sequence "+sequence+"!!!");
+ }else{
+ tmp = nodes.get(p);
+ }
+ }
+ this.node = tmp;
+ this.heartbeat = heartbeat;
+ this.sequence = sequence;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ this.node.add(this.heartbeat);
+ return null;
+ }
+ }
+
+ /**
+ * UDP Supervisor.
+ */
+ public UDPSupervisor(Configuration conf){
+ DatagramChannel ch = null;
+ try{
+ ch = DatagramChannel.open();
+ }catch(IOException ioe){
+ LOG.error("Fail to open udp channel.", ioe);
+ }
+ this.channel = ch;
+ if(null == this.channel) throw new NullPointerException();
+ try{
+ this.channel.socket().bind((SocketAddress)new InetSocketAddress(
+ conf.getInt("bsp.monitor.fd.udp_port", 16384)));
+ }catch(SocketException se){
+ LOG.error("Unable to bind the udp socket.", se);
+ }
+ WINDOW_SIZE = conf.getInt("bsp.monitor.fd.window_size", 100);
+ sched = Executors.newScheduledThreadPool(conf.
+ getInt("bsp.monitor.fd.supervisor_threads", 20));
+ }
+
+ /**
+ * The output value represents the level of a node's status,
+ * Normally called by Interpretor.
+ * @param addr to be checked.
+ * @return double value as the suspicion level of the endpoint.
+ * -1d indicates not found.
+ */
+ @Override
+ public double suspicionLevel(String addr) {
+ if(null == addr || "".equals(addr))
+ throw new NullPointerException("Target address is not provided.");
+ for(Node n: nodes){
+ if(addr.equals(n.getAddress())) {
+ return n.phi(System.currentTimeMillis());
+ }
+ }
+ return -1d;
+ }
+
+ public void run(){
+ ByteBuffer packet = ByteBuffer.allocate(8);
+ try{
+ running.set(true);
+ while(running.get()){
+ SocketAddress source = (InetSocketAddress)channel.receive(packet);
+ packet.flip();
+ long seq = packet.getLong();
+ packet.clear();
+ if(LOG.isDebugEnabled()){
+ LOG.debug("seqence: "+seq+" src address: "+
+ ((InetSocketAddress)source).getAddress().getHostAddress());
+ }
+ sched.schedule(new Hermes(((InetSocketAddress)source).getAddress()
+ .getHostAddress(), seq, System.currentTimeMillis()), 0, SECONDS);
+ }
+ }catch(IOException ioe){
+ LOG.error("Problem in receiving packet from channel.", ioe);
+ Thread.currentThread().interrupt();
+ }finally{
+ if(null != this.channel)
+ try{
+ this.channel.socket().close();
+ this.channel.close();
+ }catch(IOException ioe){
+ LOG.error("Error closing supervisor channel.",ioe);
+ }
+ }
+ }
+
+ public void shutdown(){
+ running.set(false);
+ sched.shutdown();
+ }
+
+ public boolean isShutdown(){
+ return this.channel.socket().isClosed() && !running.get();
+ }
+}
Added:
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java?rev=1310987&view=auto
==============================================================================
---
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java
(added)
+++
incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java
Sun Apr 8 12:42:55 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.monitor.fd;
+
+import static java.util.concurrent.TimeUnit.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+
+/**
+ * Test case for Phi accrual fail detector.
+ */
+public class TestFD extends HamaCluster {
+ public static final Log LOG = LogFactory.getLog(TestFD.class);
+ final HamaConfiguration conf;
+ final ScheduledExecutorService sched;
+
+ public TestFD() {
+ this.conf = getConf();
+ this.sched = Executors.newScheduledThreadPool(10);
+ }
+
+ public void setUp() throws Exception { }
+
+ /**
+ * Test Phi Accrual Fialure Detector.
+ */
+ public void testCumulativeDistributedFunction() throws Exception {
+ this.conf.setInt("bsp.monitor.fd.udp_port", 9765);
+ UDPSupervisor server = new UDPSupervisor(this.conf);
+ UDPSensor client = new UDPSensor(this.conf);
+ this.sched.schedule(server, 0, SECONDS);
+ this.sched.schedule(client, 2, SECONDS);
+ boolean flag = true;
+ int count = 0;
+ while(flag){
+ count++;
+ Thread.sleep(1000*3);
+ double phi = server.suspicionLevel("127.0.0.1");
+ if(LOG.isDebugEnabled())
+ LOG.debug("Phi value:"+phi+"
Double.isInfinite(phi):"+Double.isInfinite(phi));
+ assertTrue("In normal case phi should not go infinity!",
!Double.isInfinite(phi));
+ if(10 < count){
+ flag = false;
+ }
+ }
+ client.shutdown();
+ server.shutdown();
+ LOG.info("Finished testing suspicion level.");
+ }
+
+ /**
+ * Test when sensor fails.
+ */
+ public void testSensorFailure() throws Exception{
+ this.conf.setInt("bsp.monitor.fd.udp_port", 2874);
+ UDPSupervisor server = new UDPSupervisor(this.conf);
+ UDPSensor client = new UDPSensor(this.conf);
+ this.sched.schedule(server, 0, SECONDS);
+ this.sched.schedule(client, 2, SECONDS);
+ int count = 0;
+ boolean flag = true;
+ while(flag){
+ count++;
+ double phi = server.suspicionLevel("127.0.0.1");
+ Thread.sleep(1000*3);
+ if(5 < count){
+ client.shutdown();
+ Thread.sleep(1000*4);
+ phi = server.suspicionLevel("127.0.0.1");
+ if(LOG.isDebugEnabled())
+ LOG.debug("Phi value should go infinity:"+Double.isInfinite(phi));
+ assertTrue("In normal case phi should not go infinity!",
Double.isInfinite(phi));
+ }
+ if(10 < count){
+ flag = false;
+ }
+ }
+ server.shutdown();
+ LOG.info("Finished testing client failure case.");
+ }
+
+ public void tearDown() throws Exception {
+ sched.shutdown();
+ }
+}