Author: chl501 Date: Sun Apr 8 12:42:55 2012 New Revision: 1310987 URL: http://svn.apache.org/viewvc?rev=1310987&view=rev Log: [HAMA-370] Failure detector for Hama
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/ incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java Removed: incubator/hama/trunk/core/src/main/java/org/apache/hama/metrics/ incubator/hama/trunk/core/src/test/java/org/apache/hama/metrics/ Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java?rev=1310987&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Interpreter.java Sun Apr 8 12:42:55 2012 @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.monitor.fd; + +/** + * A component used by an application to query the target staus. + */ +public interface Interpreter { + + /** + * An output value represents the level of a node's status. + * @param address to be checked. + * @return true indicates the target node is alive; false otherwise. + */ + boolean isAlive(String address); + +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java?rev=1310987&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Node.java Sun Apr 8 12:42:55 2012 @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.monitor.fd; + +import java.util.ArrayDeque; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math.distribution.NormalDistribution; +import org.apache.commons.math.distribution.NormalDistributionImpl; +import org.apache.commons.math.MathException; + +public final class Node{ + + public static final Log LOG = LogFactory.getLog(Node.class); + + private final String address; + /** + * Sliding window that stores inter-arrival time. For instance, + * T={10, 12, 14, 17, 23, 25} Tinter-arrival={2, 2, 3, 6, 2} + */ + private final ArrayDeque<Double> samplingWindow; // fix size ws + private final int windowSize; + /* The latest heartbeat */ + private final AtomicLong latestHeartbeat = new AtomicLong(0); + + public Node(String address, int size){ + this.address = address; + this.windowSize = size; + this.samplingWindow = new ArrayDeque<Double>(windowSize); + if(null == this.address) + throw new NullPointerException("Address is not provided"); + } + + public String getAddress(){ + return this.address; + } + + void setLatestHeartbeat(long latestHeartbeat){ + this.latestHeartbeat.set(latestHeartbeat); + } + + public long getLatestHeartbeat(){ + return this.latestHeartbeat.get(); + } + + public synchronized void reset(){ + this.samplingWindow.clear(); + setLatestHeartbeat(0); + } + + /** + * The size used for storing samples. + * @return int value fixed without change over time. + */ + public int windowSize(){ + return windowSize; + } + + /** + * Inter-arrival times data as array. + * @return Double array format. + */ + public Double[] samples(){ + return (Double[]) samplingWindow.toArray( + new Double[samplingWindow.size()]); + } + + /** + * Store the latest inter-arrival time to sampling window. + * The head of the array will be dicarded. Newly received heartbeat + * is added to the tail of the sliding window. + * @param heartbeat value is the current timestamp the client . + */ + public void add(long heartbeat){ + if(null == this.samplingWindow) + throw new NullPointerException("Sampling windows not exist."); + synchronized(this.samplingWindow){ + if(0 != getLatestHeartbeat()) { + if(samplingWindow.size() == this.windowSize){ + samplingWindow.remove(); + } + samplingWindow.add(new Double(heartbeat-getLatestHeartbeat())); + } + setLatestHeartbeat(heartbeat); + } + } + + /** + * Calculate cumulative distribution function value according to + * the current timestamp, heartbeat in sampling window, and last heartbeat. + * @param timestamp is the current timestamp. + * @param samples contain inter-arrival time in the sampling window. + * @return double value as cdf, which stays between 0.0 and 1.0. + */ + public double cdf(long timestamp, Double[] samples){ + double cdf = -1d; + double mean = mean(samples); + double variance = variance(samples); + NormalDistribution cal = new NormalDistributionImpl(mean, variance); + try{ + cdf = cal.cumulativeProbability( + ((double)timestamp-(double)getLatestHeartbeat())); + }catch(MathException me){ + LOG.error("Fail to compute phi value.", me); + } + if(LOG.isDebugEnabled()) LOG.debug("Calcuated cdf:"+cdf); + return cdf; + } + + /** + * Ouput phi value. + * @param now is the current timestamp. + * @return phi value, which goes infinity when cdf returns 1, and + * stays -0.0 when cdf is 0. + */ + public double phi(long now){ + return (-1) * Math.log10(1-cdf(now, this.samples())); + } + + /** + * Mean of the samples. + * @return double value for mean of the samples. + */ + public double mean(Double[] samples){ + int len = samples.length; + if(0 >= len) + throw new RuntimeException("Samples data does not exist."); + double sum = 0d; + for(double sample: samples){ + sum += sample; + } + return sum/(double)len; + } + + /** + * Standard deviation. + * @return double value of standard deviation. + */ + public double variance(Double[] samples){ + int len = samples.length; + double mean = mean(samples); + double sumd = 0d; + for(double sample: samples) { + double v = sample - mean; + sumd += v*v; + } + return sumd/(double)len; + } + + @Override + public boolean equals(final Object target){ + if (target == this) + return true; + if (null == target) + return false; + if (getClass() != target.getClass()) + return false; + + Node n = (Node) target; + if(!getAddress().equals(n.address)) + return false; + + return true; + } + + @Override + public int hashCode(){ + int result = 17; + result = 37 * result + address.hashCode(); + return result; + } + + @Override + public String toString(){ + Double[] samples = samples(); + StringBuilder builder = new StringBuilder(); + for(double d: samples){ + builder.append(" "+d+" "); + } + return "Node address:"+this.address+" mean:"+mean(samples)+" variance:"+ + variance(samples)+" samples:["+builder.toString()+"]"; + } +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java?rev=1310987&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java Sun Apr 8 12:42:55 2012 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.monitor.fd; + +import java.io.IOException; + +/** + * Failure detector client, sending heartbeat to supervisor. + */ +public interface Sensor { + + /** + * The heartbeat function, signifying its existence. + */ + void heartbeat() throws IOException; + +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java?rev=1310987&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/SimpleBinaryInterpreter.java Sun Apr 8 12:42:55 2012 @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.monitor.fd; + +/** + * A simple binary interpreter translates phi value to application + * for checking if a specific groom server is alive. + */ +public class SimpleBinaryInterpreter implements Interpreter{ + + private final Supervisor supervisor; + + public SimpleBinaryInterpreter(Supervisor supervisor){ + this.supervisor = supervisor; + if(null == this.supervisor) + throw new NullPointerException("Supervisor is not provided."); + } + + /** + * An output value represents the level of a node's status. + * @param address to be checked. + * @return true indicates the target node is alive; false otherwise. + */ + public boolean isAlive(String address){ + if(Double.isInfinite(this.supervisor.suspicionLevel(address))){ + return false; + } + return true; + } +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java?rev=1310987&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java Sun Apr 8 12:42:55 2012 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.monitor.fd; + +/** + * A failure detector component. It is responsible for receiving the + * heartbeat and output suspicion level for Interpreter. + */ +public interface Supervisor { + + /** + * The output value represents the level of a node's status. + * @param addr to be checked. + * @return double value as the suspicion level of the endpoint. + */ + double suspicionLevel(String addr); + +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java?rev=1310987&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java Sun Apr 8 12:42:55 2012 @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.monitor.fd; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hama.HamaConfiguration; + +/** + * Failure detector UDP client. + */ +public class UDPSensor implements Sensor, Runnable{ + + public static final Log LOG = LogFactory.getLog(UDPSensor.class); + /** + * The default interval hearbeat. + */ + private static long HEARTBEAT_INTERVAL; + + /* UDP server address and port */ + private String address; + private int port; + private DatagramChannel channel; + private AtomicBoolean running = new AtomicBoolean(false); + private AtomicLong sequence = new AtomicLong(0); + + /** + * Constructor for UDP client. Setting up configuration + * and open DatagramSocket. + */ + public UDPSensor(Configuration configuration){ + this.address = + ((HamaConfiguration)configuration).get("bsp.monitor.fd.udp_address", "localhost"); + this.port = + ((HamaConfiguration)configuration).getInt("bsp.monitor.fd.udp_port", 16384); + HEARTBEAT_INTERVAL = ((HamaConfiguration)configuration).getInt( + "bsp.monitor.fd.heartbeat_interval", 100); + running.set(true); + try{ + channel = DatagramChannel.open(); + }catch(IOException ioe){ + LOG.error("Fail to initialize udp channel.", ioe); + } + } + + + /** + * The heartbeat function, signifying its existence. + */ + public void heartbeat() throws IOException{ + ByteBuffer heartbeat = ByteBuffer.allocate(8); + heartbeat.clear(); + heartbeat.putLong(sequence.incrementAndGet()); + heartbeat.flip(); + channel.send(heartbeat, new InetSocketAddress(InetAddress.getByName( + this.address), this.port)); + if(LOG.isDebugEnabled()){ + LOG.debug("Heartbeat sequence "+sequence.get()+ " is sent to "+this.address+":"+ this.port); + } + } + + public String getAddress(){ + return this.address; + } + + public int getPort(){ + return this.port; + } + + public long heartbeatInterval(){ + return HEARTBEAT_INTERVAL; + } + + public void run(){ + while(running.get()){ + try{ + heartbeat(); + Thread.sleep(HEARTBEAT_INTERVAL); + }catch(InterruptedException ie){ + LOG.error("UDPSensor is interrupted.", ie); + Thread.currentThread().interrupt(); + }catch(IOException ioe){ + LOG.error("Sensor fails in sending heartbeat.", ioe); + } + } + LOG.info("Sensor at "+this.address+" stops sending heartbeat."); + } + + public void shutdown(){ + running.set(false); + if(null != this.channel) { + try{ + this.channel.socket().close(); + this.channel.close(); + }catch(IOException ioe){ + LOG.error("Error closing sensor channel.",ioe); + } + } + } + + + public boolean isShutdown(){ + return this.channel.socket().isClosed() && !running.get(); + } + +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java?rev=1310987&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSupervisor.java Sun Apr 8 12:42:55 2012 @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.monitor.fd; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import static java.util.concurrent.TimeUnit.*; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +/** + * UDP supervisor is responsible for receiving the + * heartbeat and output suspicion level for Interpreter. + */ +public class UDPSupervisor implements Supervisor, Runnable{ + + public static final Log LOG = LogFactory.getLog(UDPSupervisor.class); + + private static int WINDOW_SIZE = 100; + private static final List<Node> nodes = new CopyOnWriteArrayList<Node>(); + private final ScheduledExecutorService sched; + private final DatagramChannel channel; + private AtomicBoolean running = new AtomicBoolean(false); + + public static class Hermes implements Callable{ + private final Node node; + private final long heartbeat; + private final long sequence; + + /** + * Logic unit deals with sensors' status. + * It first check if the packet coming is 1, indicating the arrival + * of new packet. Then it checkes the node position within the list, + * i.e., nodes. If -1 returns, a completely refresh packet arrives; + * therefore adding node info to the nodes list; otherwise reseting + * the node sampling window and the latest heartbeat value. + * If the packet comes with the sequence other than 1, meaning its + * status of continuous sending heartbeat, thus retrieve old node + * from list and process necessary steps, such as manipulating sample + * windows and assigning the last heartbeat. + * @param address of specific node equipted with sensor. + * @param sequence number is generated by the sensor. + * @param heartbeat timestamped by the supervisor. + */ + public Hermes(String address, final long sequence, final long heartbeat){ + Node n = new Node(address, WINDOW_SIZE); + int p = nodes.indexOf(n); + Node tmp = null; + if(1L == sequence) { + if(-1 == p){// fresh + tmp = n; + nodes.add(tmp); + }else{// node crashed then restarted + tmp = nodes.get(p); + tmp.reset(); + } + }else{ + if(-1 == p){ + LOG.warn("Non existing host ("+address+") is sending heartbeat"+ + " sequence "+sequence+"!!!"); + }else{ + tmp = nodes.get(p); + } + } + this.node = tmp; + this.heartbeat = heartbeat; + this.sequence = sequence; + } + + @Override + public Object call() throws Exception { + this.node.add(this.heartbeat); + return null; + } + } + + /** + * UDP Supervisor. + */ + public UDPSupervisor(Configuration conf){ + DatagramChannel ch = null; + try{ + ch = DatagramChannel.open(); + }catch(IOException ioe){ + LOG.error("Fail to open udp channel.", ioe); + } + this.channel = ch; + if(null == this.channel) throw new NullPointerException(); + try{ + this.channel.socket().bind((SocketAddress)new InetSocketAddress( + conf.getInt("bsp.monitor.fd.udp_port", 16384))); + }catch(SocketException se){ + LOG.error("Unable to bind the udp socket.", se); + } + WINDOW_SIZE = conf.getInt("bsp.monitor.fd.window_size", 100); + sched = Executors.newScheduledThreadPool(conf. + getInt("bsp.monitor.fd.supervisor_threads", 20)); + } + + /** + * The output value represents the level of a node's status, + * Normally called by Interpretor. + * @param addr to be checked. + * @return double value as the suspicion level of the endpoint. + * -1d indicates not found. + */ + @Override + public double suspicionLevel(String addr) { + if(null == addr || "".equals(addr)) + throw new NullPointerException("Target address is not provided."); + for(Node n: nodes){ + if(addr.equals(n.getAddress())) { + return n.phi(System.currentTimeMillis()); + } + } + return -1d; + } + + public void run(){ + ByteBuffer packet = ByteBuffer.allocate(8); + try{ + running.set(true); + while(running.get()){ + SocketAddress source = (InetSocketAddress)channel.receive(packet); + packet.flip(); + long seq = packet.getLong(); + packet.clear(); + if(LOG.isDebugEnabled()){ + LOG.debug("seqence: "+seq+" src address: "+ + ((InetSocketAddress)source).getAddress().getHostAddress()); + } + sched.schedule(new Hermes(((InetSocketAddress)source).getAddress() + .getHostAddress(), seq, System.currentTimeMillis()), 0, SECONDS); + } + }catch(IOException ioe){ + LOG.error("Problem in receiving packet from channel.", ioe); + Thread.currentThread().interrupt(); + }finally{ + if(null != this.channel) + try{ + this.channel.socket().close(); + this.channel.close(); + }catch(IOException ioe){ + LOG.error("Error closing supervisor channel.",ioe); + } + } + } + + public void shutdown(){ + running.set(false); + sched.shutdown(); + } + + public boolean isShutdown(){ + return this.channel.socket().isClosed() && !running.get(); + } +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java?rev=1310987&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java Sun Apr 8 12:42:55 2012 @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.monitor.fd; + +import static java.util.concurrent.TimeUnit.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hama.HamaCluster; +import org.apache.hama.HamaConfiguration; + +/** + * Test case for Phi accrual fail detector. + */ +public class TestFD extends HamaCluster { + public static final Log LOG = LogFactory.getLog(TestFD.class); + final HamaConfiguration conf; + final ScheduledExecutorService sched; + + public TestFD() { + this.conf = getConf(); + this.sched = Executors.newScheduledThreadPool(10); + } + + public void setUp() throws Exception { } + + /** + * Test Phi Accrual Fialure Detector. + */ + public void testCumulativeDistributedFunction() throws Exception { + this.conf.setInt("bsp.monitor.fd.udp_port", 9765); + UDPSupervisor server = new UDPSupervisor(this.conf); + UDPSensor client = new UDPSensor(this.conf); + this.sched.schedule(server, 0, SECONDS); + this.sched.schedule(client, 2, SECONDS); + boolean flag = true; + int count = 0; + while(flag){ + count++; + Thread.sleep(1000*3); + double phi = server.suspicionLevel("127.0.0.1"); + if(LOG.isDebugEnabled()) + LOG.debug("Phi value:"+phi+" Double.isInfinite(phi):"+Double.isInfinite(phi)); + assertTrue("In normal case phi should not go infinity!", !Double.isInfinite(phi)); + if(10 < count){ + flag = false; + } + } + client.shutdown(); + server.shutdown(); + LOG.info("Finished testing suspicion level."); + } + + /** + * Test when sensor fails. + */ + public void testSensorFailure() throws Exception{ + this.conf.setInt("bsp.monitor.fd.udp_port", 2874); + UDPSupervisor server = new UDPSupervisor(this.conf); + UDPSensor client = new UDPSensor(this.conf); + this.sched.schedule(server, 0, SECONDS); + this.sched.schedule(client, 2, SECONDS); + int count = 0; + boolean flag = true; + while(flag){ + count++; + double phi = server.suspicionLevel("127.0.0.1"); + Thread.sleep(1000*3); + if(5 < count){ + client.shutdown(); + Thread.sleep(1000*4); + phi = server.suspicionLevel("127.0.0.1"); + if(LOG.isDebugEnabled()) + LOG.debug("Phi value should go infinity:"+Double.isInfinite(phi)); + assertTrue("In normal case phi should not go infinity!", Double.isInfinite(phi)); + } + if(10 < count){ + flag = false; + } + } + server.shutdown(); + LOG.info("Finished testing client failure case."); + } + + public void tearDown() throws Exception { + sched.shutdown(); + } +}