Copied: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java (from r781592, jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryService.java) URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java?p2=jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java&p1=jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryService.java&r1=781592&r2=794825&rev=794825&view=diff ============================================================================== --- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryService.java (original) +++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java Thu Jul 16 20:14:42 2009 @@ -1,4 +1,4 @@ -package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery; +package org.apache.jcs.utils.discovery; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -21,24 +21,18 @@ import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait; -import org.apache.jcs.auxiliary.lateral.LateralCacheNoWaitFacade; -import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes; -import org.apache.jcs.engine.behavior.ICompositeCacheManager; -import org.apache.jcs.engine.behavior.IElementSerializer; -import org.apache.jcs.engine.behavior.IShutdownObservable; import org.apache.jcs.engine.behavior.IShutdownObserver; import org.apache.jcs.engine.logging.behavior.ICacheEventLogger; +import org.apache.jcs.utils.discovery.behavior.IDiscoveryListener; import org.apache.jcs.utils.net.HostNameUtil; import EDU.oswego.cs.dl.util.concurrent.ClockDaemon; +import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet; import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; /** @@ -47,6 +41,7 @@ * It also creates a sender that periodically broadcasts its availability. * <p> * The sender also broadcasts a request for other caches to broadcast their addresses. + * <p> * @author Aaron Smuts */ public class UDPDiscoveryService @@ -64,61 +59,39 @@ /** the runanble that the receiver thread runs */ private UDPDiscoveryReceiver receiver; - /** Map of facades */ - private Map facades = new HashMap(); - /** the runnanble that sends messages via the clock daemon */ private UDPDiscoverySenderThread sender = null; - /** The host address */ - private String hostAddress = "unknown"; - - /** THe udp address */ - private String discoveryAddress; + /** removes things that have been idle for too long */ + private UDPCleanupRunner cleanup; - /** The port to braodcast to. */ - private int discoveryPort; + /** attributes */ + private UDPDiscoveryAttributes udpDiscoveryAttributes = null; - /** - * the port this service runs on, the service we are telling other about. we should have a - * service info object instead - */ - private int servicePort; + /** is this shut down? */ + private boolean shutdown = false; - /** Attributes for creating new instances. */ - private ITCPLateralCacheAttributes tcpLateralCacheAttributes; + /** This is a set of services that have been discovered. */ + private Set discoveredServices = new CopyOnWriteArraySet(); - /** The event logger. */ - protected ICacheEventLogger cacheEventLogger; + /** This a list of regions that are configured to use discovery. */ + private Set cacheNames = new CopyOnWriteArraySet(); - /** The serializer. */ - protected IElementSerializer elementSerializer; + /** handles add and remove. consider making into a set. */ + private IDiscoveryListener discoveryListener; /** - * @param discoveryAddress address to multicast to - * @param discoveryPort port to multicast to - * @param servicePort the port this service runs on, the service we are telling other about - * @param cacheMgr + * @param attributes * @param cacheEventLogger - * @param elementSerializer */ - public UDPDiscoveryService( String discoveryAddress, int discoveryPort, int servicePort, - ICompositeCacheManager cacheMgr, ICacheEventLogger cacheEventLogger, - IElementSerializer elementSerializer ) - { - // register for shutdown notification - ( (IShutdownObservable) cacheMgr ).registerShutdownObserver( this ); - - this.setDiscoveryAddress( discoveryAddress ); - this.setDiscoveryPort( discoveryPort ); - this.setServicePort( servicePort ); - this.cacheEventLogger = cacheEventLogger; - this.elementSerializer = elementSerializer; + public UDPDiscoveryService( UDPDiscoveryAttributes attributes, ICacheEventLogger cacheEventLogger ) + { + udpDiscoveryAttributes = (UDPDiscoveryAttributes) attributes.clone(); try { // todo, you should be able to set this - hostAddress = HostNameUtil.getLocalHostAddress(); + udpDiscoveryAttributes.setServiceAddress( HostNameUtil.getLocalHostAddress() ); } catch ( UnknownHostException e1 ) { @@ -128,8 +101,8 @@ try { // todo need some kind of recovery here. - receiver = new UDPDiscoveryReceiver( this, getDiscoveryAddress(), getDiscoveryPort(), cacheMgr, - cacheEventLogger, elementSerializer ); + receiver = new UDPDiscoveryReceiver( this, getUdpDiscoveryAttributes().getUdpDiscoveryAddr(), + getUdpDiscoveryAttributes().getUdpDiscoveryPort() ); udpReceiverThread = new Thread( receiver ); udpReceiverThread.setDaemon( true ); // udpReceiverThread.setName( t.getName() + "--UDPReceiver" ); @@ -137,11 +110,12 @@ } catch ( Exception e ) { - log.error( "Problem creating UDPDiscoveryReceiver, address [" + getDiscoveryAddress() + "] port [" - + getDiscoveryPort() + "] we won't be able to find any other caches", e ); + log.error( "Problem creating UDPDiscoveryReceiver, address [" + + getUdpDiscoveryAttributes().getUdpDiscoveryAddr() + "] port [" + + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "] we won't be able to find any other caches", e ); } - // todo only do the passive if receive is inenabled, perhaps set the + // todo only do the passive if receive is enabled, perhaps set the // myhost to null or something on the request if ( senderDaemon == null ) { @@ -150,75 +124,25 @@ } // create a sender thread - sender = new UDPDiscoverySenderThread( getDiscoveryAddress(), getDiscoveryPort(), hostAddress, this - .getServicePort(), this.getCacheNames() ); + sender = new UDPDiscoverySenderThread( getUdpDiscoveryAttributes(), this.getCacheNames() ); senderDaemon.executePeriodically( 30 * 1000, sender, false ); - } - - /** - * Adds a nowait facade under this cachename. If one already existed, it will be overridden. - * <p> - * When a broadcast is received from the UDP Discovery receiver, for each cacheName in the - * message, the add no wait will be called here. To add a no wait, the facade is looked up for - * this cache name. - * @param facade - * @param cacheName - * @return true if the facade was not already registered. - */ - public synchronized boolean addNoWaitFacade( LateralCacheNoWaitFacade facade, String cacheName ) - { - boolean isNew = !facades.containsKey( cacheName ); - - // override or put anew, it doesn't matter - facades.put( cacheName, facade ); - - if ( isNew ) - { - if ( sender != null ) - { - // need to reset the cache names since we have a new one - sender.setCacheNames( this.getCacheNames() ); - } - } - return isNew; - } - - /** - * This adds nowaits to a facde for the region name. If the region has no facade, then it is not - * configured to use the lateral cache, and no facde will be created. - * @param noWait - */ - protected void addNoWait( LateralCacheNoWait noWait ) - { - LateralCacheNoWaitFacade facade = (LateralCacheNoWaitFacade) facades.get( noWait.getCacheName() ); - if ( log.isDebugEnabled() ) - { - log.debug( "Got facade for " + noWait.getCacheName() + " = " + facade ); - } - - if ( facade != null ) - { - boolean isNew = facade.addNoWait( noWait ); - if ( log.isDebugEnabled() ) - { - log.debug( "Called addNoWait, isNew = " + isNew ); - } - } - else - { - if ( log.isInfoEnabled() ) - { - log.info( "Different nodes are configured differently. Region [" + noWait.getCacheName() - + "] is not configured to use the lateral cache." ); - } - } + // add the cleanup daemon too + cleanup = new UDPCleanupRunner( this ); + // I'm going to use this as both, but it could happen + // that something could hang around twice the time suing this as the + // delay and the idle time. + senderDaemon.executePeriodically( this.getUdpDiscoveryAttributes().getMaxIdleTimeSec() * 1000, cleanup, false ); + + // add shutdown hook that will issue a remove call. + DiscoveryShutdownHook shutdownHook = new DiscoveryShutdownHook( this ); + Runtime.getRuntime().addShutdownHook( shutdownHook ); } /** * Send a passive broadcast in response to a request broadcast. Never send a request for a - * request. We can respond to our own reques, since a request broadcast is not intended as a + * request. We can respond to our own requests, since a request broadcast is not intended as a * connection request. We might want to only send messages, so we would send a request, but * never a passive broadcast. */ @@ -229,9 +153,11 @@ { // create this connection each time. // more robust - sender = new UDPDiscoverySender( getDiscoveryAddress(), getDiscoveryPort() ); + sender = new UDPDiscoverySender( getUdpDiscoveryAttributes().getUdpDiscoveryAddr(), + getUdpDiscoveryAttributes().getUdpDiscoveryPort() ); - sender.passiveBroadcast( hostAddress, this.getServicePort(), this.getCacheNames() ); + sender.passiveBroadcast( getUdpDiscoveryAttributes().getServiceAddress(), getUdpDiscoveryAttributes() + .getServicePort(), this.getCacheNames() ); // todo we should consider sending a request broadcast every so // often. @@ -243,8 +169,9 @@ } catch ( Exception e ) { - log.error( "Problem calling the UDP Discovery Sender. address [" + getDiscoveryAddress() + "] port [" - + getDiscoveryPort() + "]", e ); + log.error( "Problem calling the UDP Discovery Sender. address [" + + getUdpDiscoveryAttributes().getUdpDiscoveryAddr() + "] port [" + + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "]", e ); } finally { @@ -263,21 +190,102 @@ } /** + * Adds a region to the list that is participating in discovery. + * <p> + * @param cacheName + */ + public void addParticipatingCacheName( String cacheName ) + { + cacheNames.add( cacheName ); + sender.setCacheNames( getCacheNames() ); + } + + /** + * Removes the discovered service from the list and calls the discovery listener. + * <p> + * @param service + */ + public void removeDiscoveredService( DiscoveredService service ) + { + getDiscoveredServices().remove( service ); + getDiscoveryListener().removeDiscoveredService( service ); + } + + /** + * Add a service to the list. Update the held copy if we already know about it. + * <p> + * @param discoveredService discovered service + */ + protected void addOrUpdateService( DiscoveredService discoveredService ) + { + // Since this is a set we can add it over an over. + // We want to replace the old one, since we may add info that is not part of the equals. + // The equals method on the object being added is intentionally restricted. + if ( !getDiscoveredServices().contains( discoveredService ) ) + { + if ( log.isInfoEnabled() ) + { + log.info( "Set does not contain service. I discovered " + discoveredService ); + } + if ( log.isDebugEnabled() ) + { + log.debug( "Adding service in the set " + discoveredService ); + } + getDiscoveredServices().add( discoveredService ); + + // todo update some list of cachenames + getDiscoveryListener().addDiscoveredService( discoveredService ); + } + else + { + if ( log.isDebugEnabled() ) + { + log.debug( "Set contains service." ); + } + if ( log.isDebugEnabled() ) + { + log.debug( "Updating service in the set " + discoveredService ); + } + Iterator it = getDiscoveredServices().iterator(); + // need to update the time this sucks. add has no effect convert to a map + while ( it.hasNext() ) + { + DiscoveredService service1 = (DiscoveredService) it.next(); + if ( discoveredService.equals( service1 ) ) + { + service1.setLastHearFromTime( discoveredService.getLastHearFromTime() ); + break; + } + } + } + } + + /** * Get all the cache names we have facades for. * <p> * @return ArrayList */ protected ArrayList getCacheNames() { - ArrayList keys = new ArrayList(); - Set keySet = facades.keySet(); - Iterator it = keySet.iterator(); - while ( it.hasNext() ) - { - String key = (String) it.next(); - keys.add( key ); - } - return keys; + ArrayList names = new ArrayList(); + names.addAll( cacheNames ); + return names; + } + + /** + * @param attr The UDPDiscoveryAttributes to set. + */ + public void setUdpDiscoveryAttributes( UDPDiscoveryAttributes attr ) + { + this.udpDiscoveryAttributes = attr; + } + + /** + * @return Returns the lca. + */ + public UDPDiscoveryAttributes getUdpDiscoveryAttributes() + { + return this.udpDiscoveryAttributes; } /** @@ -310,99 +318,104 @@ */ public void shutdown() { - if ( log.isInfoEnabled() ) + if ( !shutdown ) { - log.info( "Shutting down UDP discovery service receiver." ); - } + shutdown = true; - try - { - // no good way to do this right now. - receiver.shutdown(); - udpReceiverThread.interrupt(); - } - catch ( Exception e ) - { - log.error( "Problem interrupting UDP receiver thread." ); - } + if ( log.isInfoEnabled() ) + { + log.info( "Shutting down UDP discovery service receiver." ); + } - if ( log.isInfoEnabled() ) - { - log.info( "Shutting down UDP discovery service sender." ); - } + try + { + // no good way to do this right now. + receiver.shutdown(); + udpReceiverThread.interrupt(); + } + catch ( Exception e ) + { + log.error( "Problem interrupting UDP receiver thread." ); + } - try - { - // interrupt all the threads. - senderDaemon.shutDown(); + if ( log.isInfoEnabled() ) + { + log.info( "Shutting down UDP discovery service sender." ); + } + + try + { + // interrupt all the threads. + senderDaemon.shutDown(); + } + catch ( Exception e ) + { + log.error( "Problem shutting down UDP sender." ); + } + + // also call the shutdown on the sender thread itself, which + // will result in a remove command. + try + { + sender.shutdown(); + } + catch ( Exception e ) + { + log.error( "Problem issuing remove broadcast via UDP sender." ); + } } - catch ( Exception e ) + else { - log.error( "Problem shutting down UDP sender." ); + if ( log.isDebugEnabled() ) + { + log.debug( "Shutdown already called." ); + } } } /** - * @param discoveryAddress The discoveryAddress to set. - */ - protected void setDiscoveryAddress( String discoveryAddress ) - { - this.discoveryAddress = discoveryAddress; - } - - /** - * @return Returns the discoveryAddress. - */ - protected String getDiscoveryAddress() - { - return discoveryAddress; - } - - /** - * @param discoveryPort The discoveryPort to set. + * Call shutdown to be safe. + * <p> + * @throws Throwable on error */ - protected void setDiscoveryPort( int discoveryPort ) + public void finalize() + throws Throwable { - this.discoveryPort = discoveryPort; - } + super.finalize(); - /** - * @return Returns the discoveryPort. - */ - protected int getDiscoveryPort() - { - return discoveryPort; + // TODO reconsider this, since it uses the logger + shutdown(); } /** - * @param servicePort The servicePort to set. + * @param discoveredServices The discoveredServices to set. */ - protected void setServicePort( int servicePort ) + public synchronized void setDiscoveredServices( Set discoveredServices ) { - this.servicePort = servicePort; + this.discoveredServices = discoveredServices; } /** - * @return Returns the servicePort. + * @return Returns the discoveredServices. */ - protected int getServicePort() + public synchronized Set getDiscoveredServices() { - return servicePort; + return discoveredServices; } /** - * @param tCPLateralCacheAttributes The tCPLateralCacheAttributes to set. + * @param discoveryListener the discoveryListener to set */ - public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tCPLateralCacheAttributes ) + public void setDiscoveryListener( IDiscoveryListener discoveryListener ) { - tcpLateralCacheAttributes = tCPLateralCacheAttributes; + this.discoveryListener = discoveryListener; } /** - * @return Returns the tCPLateralCacheAttributes. + * @return the discoveryListener */ - public ITCPLateralCacheAttributes getTcpLateralCacheAttributes() + public IDiscoveryListener getDiscoveryListener() { - return tcpLateralCacheAttributes; + return discoveryListener; } }
Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java ------------------------------------------------------------------------------ cvs2svn:cvs-rev = 1.4 Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java ------------------------------------------------------------------------------ svn:keywords = Author Date Id Revision Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java ------------------------------------------------------------------------------ svn:mergeinfo = Added: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/behavior/IDiscoveryListener.java URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/behavior/IDiscoveryListener.java?rev=794825&view=auto ============================================================================== --- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/behavior/IDiscoveryListener.java (added) +++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/behavior/IDiscoveryListener.java Thu Jul 16 20:14:42 2009 @@ -0,0 +1,25 @@ +package org.apache.jcs.utils.discovery.behavior; + +import org.apache.jcs.utils.discovery.DiscoveredService; + +/** + * Interface for things that want to listen to discovery events. This will allow discovery to be + * used outside of the TCP lateral. + */ +public interface IDiscoveryListener +{ + /** + * Add the service if needed. This does not necessarily mean that the service is not already + * added. This can be called if there is a change. + * <p> + * @param service the service to add + */ + public void addDiscoveredService( DiscoveredService service ); + + /** + * Remove the service from the list. + * <p> + * @param service the service to remove + */ + public void removeDiscoveredService( DiscoveredService service ); +} Modified: jakarta/jcs/trunk/src/test-conf/log4j.properties URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/test-conf/log4j.properties?rev=794825&r1=794824&r2=794825&view=diff ============================================================================== --- jakarta/jcs/trunk/src/test-conf/log4j.properties (original) +++ jakarta/jcs/trunk/src/test-conf/log4j.properties Thu Jul 16 20:14:42 2009 @@ -28,6 +28,7 @@ log4j.category.org.apache.jcs.auxiliary.lateral=INFO log4j.category.org.apache.jcs.utils.struct=INFO log4j.category.org.apache.jcs.utils.threadpool=INFO +log4j.category.org.apache.jcs.utils.discovery=DEBUG log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout Copied: jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/MockLateralCache.java (from r781592, jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/MockLateralCache.java) URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/MockLateralCache.java?p2=jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/MockLateralCache.java&p1=jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/MockLateralCache.java&r1=781592&r2=794825&rev=794825&view=diff ============================================================================== --- jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/MockLateralCache.java (original) +++ jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/MockLateralCache.java Thu Jul 16 20:14:42 2009 @@ -1,4 +1,4 @@ -package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery; +package org.apache.jcs.auxiliary.lateral.socket.tcp; /* * Licensed to the Apache Software Foundation (ASF) under one Propchange: jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/MockLateralCache.java ------------------------------------------------------------------------------ cvs2svn:cvs-rev = 1.2 Propchange: jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/MockLateralCache.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/MockLateralCache.java ------------------------------------------------------------------------------ svn:keywords = Author Date Id Revision Propchange: jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/MockLateralCache.java ------------------------------------------------------------------------------ svn:mergeinfo = Added: jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/MockDiscoveryListener.java URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/MockDiscoveryListener.java?rev=794825&view=auto ============================================================================== --- jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/MockDiscoveryListener.java (added) +++ jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/MockDiscoveryListener.java Thu Jul 16 20:14:42 2009 @@ -0,0 +1,35 @@ +package org.apache.jcs.utils.discovery; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.jcs.utils.discovery.behavior.IDiscoveryListener; + +/** Mock listener, for testing. */ +public class MockDiscoveryListener + implements IDiscoveryListener +{ + /** discovered services. */ + public List discoveredServices = new ArrayList(); + + /** + * Adds the entry to a list. I'm not using a set. I want to see if we get dupes. + * <p> + * @param service + */ + public void addDiscoveredService( DiscoveredService service ) + { + discoveredServices.add( service ); + } + + /** + * Removes it from the list. + * <p> + * @param service + */ + public void removeDiscoveredService( DiscoveredService service ) + { + discoveredServices.remove( service ); + } + +} Added: jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoverySenderUnitTest.java URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoverySenderUnitTest.java?rev=794825&view=auto ============================================================================== --- jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoverySenderUnitTest.java (added) +++ jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoverySenderUnitTest.java Thu Jul 16 20:14:42 2009 @@ -0,0 +1,130 @@ +package org.apache.jcs.utils.discovery; + +import java.util.ArrayList; + +import junit.framework.TestCase; + +/** + * Tests for the sender. + */ +public class UDPDiscoverySenderUnitTest + extends TestCase +{ + /** multicast address to send/receive on */ + private static final String ADDRESS = "228.4.5.9"; + + /** multicast address to send/receive on */ + private static final int PORT = 5555; + + /** imaginary host address for sending */ + private static final String SENDING_HOST = "imaginary host address"; + + /** imaginary port for sending */ + private static final int SENDING_PORT = 1; + + /** receiver instance for tests */ + private UDPDiscoveryReceiver receiver; + + /** sender instance for tests */ + private UDPDiscoverySender sender; + + /** + * Set up the receiver. Maybe better to just code sockets here? Set up the sender for sending + * the message. + * <p> + * @throws Exception on error + */ + protected void setUp() + throws Exception + { + super.setUp(); + receiver = new UDPDiscoveryReceiver( null, ADDRESS, PORT ); + sender = new UDPDiscoverySender( ADDRESS, PORT ); + } + + /** + * Kill off the sender and receiver. + * <p> + * @throws Exception on error + */ + protected void tearDown() + throws Exception + { + receiver.shutdown(); + sender.destroy(); + super.tearDown(); + } + + /** + * Test sending a live messages. + * <p> + * @throws Exception on error + */ + public void testPassiveBroadcast() + throws Exception + { + // SETUP + ArrayList cacheNames = new ArrayList(); + + // DO WORK + sender.passiveBroadcast( SENDING_HOST, SENDING_PORT, cacheNames, 1L ); + + // VERIFY + // grab the sent message + Object obj = receiver.waitForMessage() ; + + assertTrue( "unexpected crap received", obj instanceof UDPDiscoveryMessage ); + + UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj; + assertEquals( "wrong host", SENDING_HOST, msg.getHost() ); + assertEquals( "wrong port", SENDING_PORT, msg.getPort() ); + assertEquals( "wrong message type", UDPDiscoveryMessage.PASSIVE_BROADCAST, msg.getMessageType() ); + } + + /** + * Test sending a remove broadcast. + * <p> + * @throws Exception on error + */ + public void testRemoveBroadcast() + throws Exception + { + // SETUP + ArrayList cacheNames = new ArrayList(); + + // DO WORK + sender.removeBroadcast( SENDING_HOST, SENDING_PORT, cacheNames, 1L ); + + // VERIFY + // grab the sent message + Object obj = receiver.waitForMessage(); + + assertTrue( "unexpected crap received", obj instanceof UDPDiscoveryMessage ); + + UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj; + assertEquals( "wrong host", SENDING_HOST, msg.getHost() ); + assertEquals( "wrong port", SENDING_PORT, msg.getPort() ); + assertEquals( "wrong message type", UDPDiscoveryMessage.REMOVE_BROADCAST, msg.getMessageType() ); + } + + /** + * Test sending a request broadcast. + * <p> + * @throws Exception on error + */ + public void testRequestBroadcast() + throws Exception + { + // DO WORK + sender.requestBroadcast(); + + // VERIFY + // grab the sent message + Object obj = receiver.waitForMessage(); + + assertTrue( "unexpected crap received", obj instanceof UDPDiscoveryMessage ); + + UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj; + assertEquals( "wrong message type", UDPDiscoveryMessage.REQUEST_BROADCAST, msg.getMessageType() ); + } +} Added: jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryServiceUnitTest.java URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryServiceUnitTest.java?rev=794825&view=auto ============================================================================== --- jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryServiceUnitTest.java (added) +++ jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryServiceUnitTest.java Thu Jul 16 20:14:42 2009 @@ -0,0 +1,138 @@ +package org.apache.jcs.utils.discovery; + +import java.util.ArrayList; +import java.util.Iterator; + +import junit.framework.TestCase; + +import org.apache.jcs.auxiliary.MockCacheEventLogger; + +/** Unit tests for the service. */ +public class UDPDiscoveryServiceUnitTest + extends TestCase +{ + /** Verify that the list is updated. */ + public void testAddOrUpdateService_NotInList() + { + // SETUP + String host = "228.5.6.7"; + int port = 6789; + UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes(); + attributes.setUdpDiscoveryAddr( host ); + attributes.setUdpDiscoveryPort( port ); + attributes.setServicePort( 1000 ); + + // create the service + UDPDiscoveryService service = new UDPDiscoveryService( attributes, new MockCacheEventLogger() ); + service.addParticipatingCacheName( "testCache1" ); + + MockDiscoveryListener discoveryListener = new MockDiscoveryListener(); + service.setDiscoveryListener( discoveryListener ); + + DiscoveredService discoveredService = new DiscoveredService(); + discoveredService.setServiceAddress( host ); + discoveredService.setCacheNames( new ArrayList() ); + discoveredService.setServicePort( 1000 ); + discoveredService.setLastHearFromTime( 100 ); + + // DO WORK + service.addOrUpdateService( discoveredService ); + + // VERIFY + assertTrue( "Service should be in the service list.", service.getDiscoveredServices() + .contains( discoveredService ) ); + assertTrue( "Service should be in the listener list.", discoveryListener.discoveredServices + .contains( discoveredService ) ); + } + + /** Verify that the list is updated. */ + public void testAddOrUpdateService_InList() + { + // SETUP + String host = "228.5.6.7"; + int port = 6789; + UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes(); + attributes.setUdpDiscoveryAddr( host ); + attributes.setUdpDiscoveryPort( port ); + attributes.setServicePort( 1000 ); + + // create the service + UDPDiscoveryService service = new UDPDiscoveryService( attributes, new MockCacheEventLogger() ); + service.addParticipatingCacheName( "testCache1" ); + + MockDiscoveryListener discoveryListener = new MockDiscoveryListener(); + service.setDiscoveryListener( discoveryListener ); + + DiscoveredService discoveredService = new DiscoveredService(); + discoveredService.setServiceAddress( host ); + discoveredService.setCacheNames( new ArrayList() ); + discoveredService.setServicePort( 1000 ); + discoveredService.setLastHearFromTime( 100 ); + + DiscoveredService discoveredService2 = new DiscoveredService(); + discoveredService2.setServiceAddress( host ); + discoveredService2.setCacheNames( new ArrayList() ); + discoveredService2.setServicePort( 1000 ); + discoveredService2.setLastHearFromTime( 500 ); + + // DO WORK + service.addOrUpdateService( discoveredService ); + // again + service.addOrUpdateService( discoveredService2 ); + + // VERIFY + assertEquals( "Should only be one in the set.", 1, service.getDiscoveredServices().size() ); + assertTrue( "Service should be in the service list.", service.getDiscoveredServices() + .contains( discoveredService ) ); + assertTrue( "Service should be in the listener list.", discoveryListener.discoveredServices + .contains( discoveredService ) ); + + Iterator it = service.getDiscoveredServices().iterator(); + // need to update the time this sucks. add has no effect convert to a map + while ( it.hasNext() ) + { + DiscoveredService service1 = (DiscoveredService) it.next(); + if ( discoveredService.equals( service1 ) ) + { + assertEquals( "The match should have the new last heard from time.", service1.getLastHearFromTime(), + discoveredService2.getLastHearFromTime() ); + } + } + } + + /** Verify that the list is updated. */ + public void testRemoveDiscoveredService() + { + // SETUP + String host = "228.5.6.7"; + int port = 6789; + UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes(); + attributes.setUdpDiscoveryAddr( host ); + attributes.setUdpDiscoveryPort( port ); + attributes.setServicePort( 1000 ); + + // create the service + UDPDiscoveryService service = new UDPDiscoveryService( attributes, new MockCacheEventLogger() ); + service.addParticipatingCacheName( "testCache1" ); + + MockDiscoveryListener discoveryListener = new MockDiscoveryListener(); + service.setDiscoveryListener( discoveryListener ); + + DiscoveredService discoveredService = new DiscoveredService(); + discoveredService.setServiceAddress( host ); + discoveredService.setCacheNames( new ArrayList() ); + discoveredService.setServicePort( 1000 ); + discoveredService.setLastHearFromTime( 100 ); + + service.addOrUpdateService( discoveredService ); + + // DO WORK + service.removeDiscoveredService( discoveredService ); + + // VERIFY + assertFalse( "Service should not be in the service list.", service.getDiscoveredServices() + .contains( discoveredService ) ); + assertFalse( "Service should not be in the listener list.", discoveryListener.discoveredServices + .contains( discoveredService ) ); + } +} Copied: jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryUnitTest.java (from r781592, jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryUnitTest.java) URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryUnitTest.java?p2=jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryUnitTest.java&p1=jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryUnitTest.java&r1=781592&r2=794825&rev=794825&view=diff ============================================================================== --- jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryUnitTest.java (original) +++ jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryUnitTest.java Thu Jul 16 20:14:42 2009 @@ -1,4 +1,4 @@ -package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery; +package org.apache.jcs.utils.discovery; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -23,17 +23,8 @@ import junit.framework.TestCase; -import org.apache.jcs.JCS; import org.apache.jcs.auxiliary.MockCacheEventLogger; -import org.apache.jcs.auxiliary.lateral.LateralCache; -import org.apache.jcs.auxiliary.lateral.LateralCacheAttributes; -import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait; -import org.apache.jcs.auxiliary.lateral.LateralCacheNoWaitFacade; -import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes; -import org.apache.jcs.auxiliary.lateral.socket.tcp.TCPLateralCacheAttributes; -import org.apache.jcs.engine.behavior.ICompositeCacheManager; -import org.apache.jcs.engine.control.CompositeCacheManager; -import org.apache.jcs.utils.serialization.StandardSerializer; +import org.apache.jcs.utils.timing.SleepUtil; /** * Unit tests for discovery @@ -42,71 +33,33 @@ extends TestCase { /** - * Test setup - */ - public void setUp() - { - JCS.setConfigFilename( "/TestUDPDiscovery.ccf" ); - } - - /** - * 1. create the attributes for the service - * <p> - * 2. create the service - * <p> - * 3. create a no wait facade for the service - * <p> - * 4. add the facade to the service under the name testCache1 - * <p> - * 5. create a receiver with the service - * <p> - * 6. create a sender * <p> - * 7.create more names than we have no wait facades for the only one that gets added should be - * testCache1 - * <p> - * 8. send 10 messages - * <p> - * 9. check to see that we got 10 messages - * <p> - * 10. check to see if the testCache1 facade got a nowait. * @throws Exception */ public void testSimpleUDPDiscovery() throws Exception { - // create the attributes for the service - TCPLateralCacheAttributes lac = new TCPLateralCacheAttributes(); - lac.setTransmissionType( LateralCacheAttributes.TCP ); - lac.setTcpServer( "localhost" + ":" + 1111 ); - - ICompositeCacheManager cacheMgr = CompositeCacheManager.getInstance(); + UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes(); + attributes.setUdpDiscoveryAddr( "228.5.6.7" ); + attributes.setUdpDiscoveryPort( 6789 ); + attributes.setServicePort( 1000 ); // create the service - UDPDiscoveryService service = new UDPDiscoveryService( lac.getUdpDiscoveryAddr(), lac.getUdpDiscoveryPort(), - lac.getTcpListenerPort(), cacheMgr, - new MockCacheEventLogger(), new StandardSerializer() ); - service.setTcpLateralCacheAttributes( lac ); - - // create a no wait facade for the service - ArrayList noWaits = new ArrayList(); - ILateralCacheAttributes attr = new LateralCacheAttributes(); - attr.setCacheName( "testCache1" ); - - LateralCacheNoWaitFacade lcnwf = new LateralCacheNoWaitFacade( (LateralCacheNoWait[]) noWaits - .toArray( new LateralCacheNoWait[0] ), attr ); + UDPDiscoveryService service = new UDPDiscoveryService( attributes, new MockCacheEventLogger() ); + service.addParticipatingCacheName( "testCache1" ); - // add the facade to the service under the name testCache1 - service.addNoWaitFacade( lcnwf, "testCache1" ); + MockDiscoveryListener discoveryListener = new MockDiscoveryListener(); + service.setDiscoveryListener( discoveryListener ); // create a receiver with the service - UDPDiscoveryReceiver receiver = new UDPDiscoveryReceiver( service, "228.5.6.7", 6789, cacheMgr, - new MockCacheEventLogger(), new StandardSerializer() ); + UDPDiscoveryReceiver receiver = new UDPDiscoveryReceiver( service, attributes.getUdpDiscoveryAddr(), attributes + .getUdpDiscoveryPort() ); Thread t = new Thread( receiver ); t.start(); // create a sender - UDPDiscoverySender sender = new UDPDiscoverySender( "228.5.6.7", 6789 ); + UDPDiscoverySender sender = new UDPDiscoverySender( attributes.getUdpDiscoveryAddr(), attributes + .getUdpDiscoveryPort() ); // create more names than we have no wait facades for // the only one that gets added should be testCache1 @@ -124,80 +77,16 @@ for ( ; cnt < max; cnt++ ) { sender.passiveBroadcast( "localhost", 1111, cacheNames, 1 ); - Thread.sleep( 3 ); + SleepUtil.sleepAtLeast( 20 ); } + SleepUtil.sleepAtLeast( 200 ); + // check to see that we got 10 messages - System.out.println( "Receiver count = " + receiver.getCnt() ); - //assertEquals( "Receiver count should be the same as the number - // sent.", cnt, receiver.getCnt() ); + //System.out.println( "Receiver count = " + receiver.getCnt() ); // request braodcasts change things. - assertTrue( "Receiver count should be the at least the number sent.", cnt <= receiver.getCnt() ); - - Thread.sleep( 2000 ); - - // check to see if the testCache1 facade got a nowait. - assertEquals( "Should have 1", 1, lcnwf.noWaits.length ); - - //ArrayList cacheNames2 = new ArrayList(); - //cacheNames2.add( "testCache1" ); - // add another - //sender.passiveBroadcast( "localhost", 11112, cacheNames2, 1 ); - //Thread.sleep( 30 ); - //assertEquals( "Should have 2", 2, lcnwf.noWaits.length ); - - } - - /** - * Verify that the config does not throw any errors. - * @throws Exception - */ - public void testUDPDiscoveryConfig() - throws Exception - { - JCS jcs = JCS.getInstance( "testCache1" ); - - System.out.println( jcs.getStats() ); - - JCS jcs2 = JCS.getInstance( "testCache2" ); - - System.out.println( jcs2.getStats() ); - - } - - /** - * Make sure the no wait facade doesn't add dupes. - * <p> - * @throws Exception - */ - public void testNoWaitFacadeAdd() - throws Exception - { - ArrayList noWaits = new ArrayList(); - ILateralCacheAttributes attr = new LateralCacheAttributes(); - attr.setCacheName( "testCache1" ); - LateralCacheNoWaitFacade lcnwf = new LateralCacheNoWaitFacade( (LateralCacheNoWait[]) noWaits - .toArray( new LateralCacheNoWait[0] ), attr ); - - TCPLateralCacheAttributes lac = new TCPLateralCacheAttributes(); - lac.setTransmissionType( LateralCacheAttributes.TCP ); - lac.setTcpServer( "localhost" + ":" + 1111 ); - - LateralCache cache = new MockLateralCache( lac ); - - // add one - LateralCacheNoWait noWait = new LateralCacheNoWait( cache ); - lcnwf.addNoWait( noWait ); - assertEquals( "Facade should have 1 no wait", 1, lcnwf.noWaits.length ); - - // add another - LateralCacheNoWait noWait2 = new LateralCacheNoWait( cache ); - lcnwf.addNoWait( noWait2 ); - assertEquals( "Facade should have 2 no waits", 2, lcnwf.noWaits.length ); - - // try adding the same one again - lcnwf.addNoWait( noWait2 ); - assertEquals( "Facade should still have 2 no waits", 2, lcnwf.noWaits.length ); + assertTrue( "Receiver count [" + receiver.getCnt() + "] should be the at least the number sent [" + cnt + "].", + cnt <= receiver.getCnt() ); } } Propchange: jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryUnitTest.java ------------------------------------------------------------------------------ cvs2svn:cvs-rev = 1.1 Propchange: jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryUnitTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryUnitTest.java ------------------------------------------------------------------------------ svn:keywords = Author Date Id Revision Propchange: jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryUnitTest.java ------------------------------------------------------------------------------ svn:mergeinfo = --------------------------------------------------------------------- To unsubscribe, e-mail: jcs-dev-unsubscr...@jakarta.apache.org For additional commands, e-mail: jcs-dev-h...@jakarta.apache.org