Author: chirino
Date: Mon Apr 7 08:59:34 2008
New Revision: 645580
URL: http://svn.apache.org/viewvc?rev=645580&view=rev
Log:
Applied patch https://issues.apache.org/activemq/browse/AMQ-1587
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java?rev=645580&r1=645579&r2=645580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/LdapNetworkConnector.java
Mon Apr 7 08:59:34 2008
@@ -17,11 +17,12 @@
package org.apache.activemq.network;
import java.net.URI;
-import java.util.ArrayList;
import java.util.Hashtable;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.naming.CommunicationException;
import javax.naming.Context;
import javax.naming.NamingEnumeration;
import javax.naming.directory.Attributes;
@@ -29,8 +30,14 @@
import javax.naming.directory.InitialDirContext;
import javax.naming.directory.SearchControls;
import javax.naming.directory.SearchResult;
+import javax.naming.event.EventDirContext;
+import javax.naming.event.NamespaceChangeListener;
+import javax.naming.event.NamingEvent;
+import javax.naming.event.NamingExceptionEvent;
+import javax.naming.event.ObjectChangeListener;
-import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.URISupport.CompositeData;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,178 +53,405 @@
*
* @org.apache.xbean.XBean element="ldapNetworkConnector"
*/
-public class LdapNetworkConnector extends NetworkConnector {
- private static final Log LOG =
LogFactory.getLog(LdapNetworkConnector.class);
-
- // TODO: future >> LDAP JNDI event handling to update connectors?
-
- // force returned entries to implement the ipHost and ipService
objectClasses (RFC 2307)
- private static final String REQUIRED_OBJECT_CLASS_FILTER =
"(&(objectClass=ipHost)(objectClass=ipService))";
-
- // required
- private URI ldapURI;
- private String ldapBase;
- private String ldapUser;
- private String ldapPassword;
-
- // optional
- private int ldapSearchScope = SearchControls.OBJECT_SCOPE;
- private String ldapSearchFilter = REQUIRED_OBJECT_CLASS_FILTER;
-
- // internal configurables
- private DirContext ldapContext;
- private List<NetworkConnector> connectors = new
CopyOnWriteArrayList<NetworkConnector>();
-
- /**
- * default constructor
- */
- public LdapNetworkConnector() {
- }
-
- /**
- * sets the LDAP server URI
- *
- * @param uri LDAP server URI
- */
- public void setUri(URI uri) {
- ldapURI = uri;
- }
-
- /**
- * sets the base LDAP dn used for lookup operations
- *
- * @param base LDAP base dn
- */
- public void setBase(String base) {
- ldapBase = base;
- }
-
- /**
- * sets the LDAP user for access credentials
- *
- * @param user LDAP dn of user
- */
- public void setUser(String user) {
- ldapUser = user;
- }
-
- /**
- * sets the LDAP password for access credentials
- *
- * @param password user password
- */
- public void setPassword(String password) {
- ldapPassword = password;
- }
-
- /**
- * sets the LDAP search scope
- *
- * @param searchScope LDAP JNDI search scope
- */
- public void setSearchScope(String searchScope) throws Exception {
- if(searchScope.equals("OBJECT_SCOPE")) {
- ldapSearchScope = SearchControls.OBJECT_SCOPE;
- }
- else if(searchScope.equals("ONELEVEL_SCOPE")) {
- ldapSearchScope = SearchControls.ONELEVEL_SCOPE;
- }
- else if(searchScope.equals("SUBTREE_SCOPE")) {
- ldapSearchScope = SearchControls.SUBTREE_SCOPE;
- }
- else {
- throw new Exception("ERR: unknown LDAP search scope specified: " +
searchScope);
- }
- }
-
- /**
- * sets the LDAP search filter as defined in RFC 2254
- *
- * @param searchFilter LDAP search filter
- * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
- */
- public void setSearchFilter(String searchFilter) {
- ldapSearchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" +
searchFilter + "))";
- }
-
- /**
- * start the connector
- */
- // XXX: this method seems awfully redundant when looking through the
- // call stack when used in NetworkConnector based objects. I don't
- // see why derived classes shouldn't just override the start/stop
methods
- protected void handleStart() throws Exception {
- initLdapContext();
- for(URI uri : getLdapUris()) {
- NetworkConnector connector =
getBrokerService().addNetworkConnector(uri);
- connector.start();
- connectors.add(connector);
- }
- super.handleStart();
- }
-
- /**
- * stop the connector
- *
- * @param stopper service stopper object
- */
- // XXX: this method seems awfully redundant when looking through the
- // call stack when used in NetworkConnector based objects. I don't
- // see why derived classes shouldn't just override the start/stop
methods
- protected void handleStop(ServiceStopper stopper) throws Exception {
- for(NetworkConnector connector : connectors) {
- getBrokerService().removeNetworkConnector(connector);
- connector.stop();
- }
- ldapContext.close();
- super.handleStop(stopper);
- }
-
- /**
- * returns the name of the connector
- *
- * @return connector name
- */
- // XXX: this should probably be fixed elsewhere for all
- // NetworkConnector derivatives...this impl does not
- // seem to be well thought out?
- public String getName() {
- return toString();
- }
-
- /**
- * initializes the LDAP JNDI context with the configured parameters
- */
- protected void initLdapContext() throws Exception {
- Hashtable env = new Hashtable();
- env.put(Context.INITIAL_CONTEXT_FACTORY,
"com.sun.jndi.ldap.LdapCtxFactory");
- env.put(Context.PROVIDER_URL, ldapURI.toString());
- env.put(Context.SECURITY_PRINCIPAL, ldapUser);
- env.put(Context.SECURITY_CREDENTIALS, ldapPassword);
- ldapContext = new InitialDirContext(env);
- }
-
- /**
- * retrieves URIs matching the search filter via LDAP
- * and creates network connectors based on the entries
- *
- * @returns list of retrieved URIs
- */
- protected List<URI> getLdapUris() throws Exception {
- SearchControls controls = new SearchControls();
- controls.setSearchScope(ldapSearchScope);
- NamingEnumeration<SearchResult> results = ldapContext.search(ldapBase,
ldapSearchFilter, controls);
-
- List<URI> uriList = new ArrayList();
- while(results.hasMore()) {
- Attributes attributes = results.next().getAttributes();
- String address = (String)attributes.get("iphostnumber").get();
- String port = (String)attributes.get("ipserviceport").get();
- String protocol =
(String)attributes.get("ipserviceprotocol").get();
- URI uri = new URI("static:(" + protocol + "://" + address + ":" +
port + ")");
- LOG.info("Discovered URI " + uri);
- uriList.add(uri);
- }
- return uriList;
- }
+public class LdapNetworkConnector
+ extends NetworkConnector
+ implements NamespaceChangeListener,
+ ObjectChangeListener
+{
+ private static final Log LOG =
LogFactory.getLog(LdapNetworkConnector.class);
+
+ // force returned entries to implement the ipHost and ipService object
classes (RFC 2307)
+ private static final String REQUIRED_OBJECT_CLASS_FILTER =
"(&(objectClass=ipHost)(objectClass=ipService))";
+
+ // connection
+ private URI[] availableURIs = null;
+ private int availableURIsIndex = 0;
+ private String base = null;
+ private boolean failover = false;
+ private long curReconnectDelay = 1000; /* 1 sec */
+ private long maxReconnectDelay = 30000; /* 30 sec */
+
+ // authentication
+ private String user = null;
+ private String password = null;
+ private boolean anonymousAuthentication = false;
+
+ // search
+ private SearchControls searchControls = new SearchControls(/*
ONELEVEL_SCOPE */);
+ private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER;
+ private boolean searchEventListener = false;
+
+ // connector management
+ private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap();
+ private Map<URI, Integer> referenceMap = new ConcurrentHashMap();
+ private Map<String, URI> uuidMap = new ConcurrentHashMap();
+
+ // local context
+ private DirContext context = null;
+
+ /**
+ * returns the next URI from the configured list
+ *
+ * @return random URI from the configured list
+ */
+ public URI getUri()
+ { return availableURIs[++availableURIsIndex % availableURIs.length]; }
+
+ /**
+ * sets the LDAP server URI
+ *
+ * @param _uri LDAP server URI
+ */
+ public void setUri(URI _uri)
+ throws Exception
+ {
+ CompositeData data = URISupport.parseComposite(_uri);
+ if(data.getScheme().equals("failover"))
+ {
+ availableURIs = data.getComponents();
+ failover = true;
+ }
+ else
+ { availableURIs = new URI[]{ _uri }; }
+ }
+
+ /**
+ * sets the base LDAP dn used for lookup operations
+ *
+ * @param _base LDAP base dn
+ */
+ public void setBase(String _base)
+ { base = _base; }
+
+ /**
+ * sets the LDAP user for access credentials
+ *
+ * @param _user LDAP dn of user
+ */
+ public void setUser(String _user)
+ { user = _user; }
+
+ /**
+ * sets the LDAP password for access credentials
+ *
+ * @param _password user password
+ */
+ public void setPassword(String _password)
+ { password = _password; }
+
+ /**
+ * sets LDAP anonymous authentication access credentials
+ *
+ * @param _anonymousAuthentication set to true to use anonymous
authentication
+ */
+ public void setAnonymousAuthentication(boolean _anonymousAuthentication)
+ { anonymousAuthentication = _anonymousAuthentication; }
+
+ /**
+ * sets the LDAP search scope
+ *
+ * @param _searchScope LDAP JNDI search scope
+ */
+ public void setSearchScope(String _searchScope)
+ throws Exception
+ {
+ int scope;
+ if(_searchScope.equals("OBJECT_SCOPE"))
+ { scope = SearchControls.OBJECT_SCOPE; }
+ else if(_searchScope.equals("ONELEVEL_SCOPE"))
+ { scope = SearchControls.ONELEVEL_SCOPE; }
+ else if(_searchScope.equals("SUBTREE_SCOPE"))
+ { scope = SearchControls.SUBTREE_SCOPE; }
+ else
+ { throw new Exception("ERR: unknown LDAP search scope specified: " +
_searchScope); }
+ searchControls.setSearchScope(scope);
+ }
+
+ /**
+ * sets the LDAP search filter as defined in RFC 2254
+ *
+ * @param _searchFilter LDAP search filter
+ * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
+ */
+ public void setSearchFilter(String _searchFilter)
+ { searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" +
_searchFilter + "))"; }
+
+ /**
+ * enables/disable a persistent search to the LDAP server as defined
+ * in draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
+ *
+ * @param _searchEventListener enable = true, disable = false (default)
+ * @see <a
href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a>
+ */
+ public void setSearchEventListener(boolean _searchEventListener)
+ { searchEventListener = _searchEventListener; }
+
+ /**
+ * start the connector
+ */
+ public void start()
+ throws Exception
+ {
+ LOG.info("connecting...");
+ Hashtable<String, String> env = new Hashtable();
+ env.put(Context.INITIAL_CONTEXT_FACTORY,
"com.sun.jndi.ldap.LdapCtxFactory");
+ URI uri = getUri();
+ LOG.debug(" URI [" + uri + "]");
+ env.put(Context.PROVIDER_URL, uri.toString());
+ if(anonymousAuthentication)
+ {
+ LOG.debug(" login credentials [anonymous]");
+ env.put(Context.SECURITY_AUTHENTICATION, "none");
+ }
+ else
+ {
+ LOG.debug(" login credentials [" + user + ":******]");
+ env.put(Context.SECURITY_PRINCIPAL, user);
+ env.put(Context.SECURITY_CREDENTIALS, password);
+ }
+ boolean isConnected = false;
+ while(!isConnected)
+ {
+ try
+ {
+ context = new InitialDirContext(env);
+ isConnected = true;
+ }
+ catch(CommunicationException err)
+ {
+ if(failover)
+ {
+ uri = getUri();
+ LOG.error("connection error [" + env.get(Context.PROVIDER_URL)
+ "], failover connection to [" + uri.toString() + "]");
+ env.put(Context.PROVIDER_URL, uri.toString());
+ Thread.sleep(curReconnectDelay);
+ curReconnectDelay = Math.min(curReconnectDelay * 2,
maxReconnectDelay);
+ }
+ else
+ { throw err; }
+ }
+ }
+
+ // add connectors from search results
+ LOG.info("searching for network connectors...");
+ LOG.debug(" base [" + base + "]");
+ LOG.debug(" filter [" + searchFilter + "]");
+ LOG.debug(" scope [" + searchControls.getSearchScope() + "]");
+ NamingEnumeration<SearchResult> results = context.search(base,
searchFilter, searchControls);
+ while(results.hasMore())
+ { addConnector(results.next()); }
+
+ // register persistent search event listener
+ if(searchEventListener)
+ {
+ LOG.info("registering persistent search listener...");
+ EventDirContext eventContext = (EventDirContext)context.lookup("");
+ eventContext.addNamingListener(base, searchFilter, searchControls,
this);
+ }
+ else // otherwise close context (i.e. connection as it is no longer
needed)
+ { context.close(); }
+ }
+
+ /**
+ * stop the connector
+ */
+ public void stop()
+ throws Exception
+ {
+ LOG.info("stopping context...");
+ for(NetworkConnector connector : connectorMap.values())
+ { connector.stop(); }
+ connectorMap.clear();
+ referenceMap.clear();
+ uuidMap.clear();
+ context.close();
+ }
+
+ /**
+ * returns the name of the connector
+ *
+ * @return connector name
+ */
+ public String getName()
+ { return toString(); }
+
+ /**
+ * add connector of the given URI
+ *
+ * @param result search result of connector to add
+ */
+ protected synchronized void addConnector(SearchResult result)
+ throws Exception
+ {
+ String uuid = toUUID(result);
+ if(uuidMap.containsKey(uuid))
+ {
+ LOG.warn("connector already regsitered for UUID [" + uuid + "]");
+ return;
+ }
+
+ URI connectorURI = toURI(result);
+ if(connectorMap.containsKey(connectorURI))
+ {
+ int referenceCount = referenceMap.get(connectorURI) + 1;
+ LOG.warn("connector reference added for URI [" + connectorURI + "],
UUID [" + uuid + "], total reference(s) [" + referenceCount + "]");
+ referenceMap.put(connectorURI, referenceCount);
+ uuidMap.put(uuid, connectorURI);
+ return;
+ }
+
+ // FIXME: disable JMX listing of LDAP managed connectors, we will
+ // want to map/manage these differently in the future
+// boolean useJMX = getBrokerService().isUseJmx();
+// getBrokerService().setUseJmx(false);
+ NetworkConnector connector =
getBrokerService().addNetworkConnector(connectorURI);
+// getBrokerService().setUseJmx(useJMX);
+
+ // propogate std connector properties that may have been set via XML
+ connector.setDynamicOnly(isDynamicOnly());
+
connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
+ connector.setNetworkTTL(getNetworkTTL());
+ connector.setConduitSubscriptions(isConduitSubscriptions());
+ connector.setExcludedDestinations(getExcludedDestinations());
+
connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
+ connector.setDuplex(isDuplex());
+
+ // XXX: set in the BrokerService.startAllConnectors method and is
+ // required to prevent remote broker exceptions upon connection
+ connector.setLocalUri(getBrokerService().getVmConnectorURI());
+ connector.setBrokerName(getBrokerService().getBrokerName());
+
connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations());
+
+ // start network connector
+ connectorMap.put(connectorURI, connector);
+ referenceMap.put(connectorURI, 1);
+ uuidMap.put(uuid, connectorURI);
+ connector.start();
+ LOG.info("connector added with URI [" + connectorURI + "]");
+ }
+
+ /**
+ * remove connector of the given URI
+ *
+ * @param result search result of connector to remove
+ */
+ protected synchronized void removeConnector(SearchResult result)
+ throws Exception
+ {
+ String uuid = toUUID(result);
+ if(!uuidMap.containsKey(uuid))
+ {
+ LOG.warn("connector not regsitered for UUID [" + uuid + "]");
+ return;
+ }
+
+ URI connectorURI = uuidMap.get(uuid);
+ if(!connectorMap.containsKey(connectorURI))
+ {
+ LOG.warn("connector not regisitered for URI [" + connectorURI + "]");
+ return;
+ }
+
+ int referenceCount = referenceMap.get(connectorURI) - 1;
+ referenceMap.put(connectorURI, referenceCount);
+ uuidMap.remove(uuid);
+ LOG.debug("connector referenced removed for URI [" + connectorURI + "],
UUID [" + uuid + "], remaining reference(s) [" + referenceCount + "]");
+
+ if(referenceCount > 0)
+ { return; }
+
+ NetworkConnector connector = connectorMap.remove(connectorURI);
+ connector.stop();
+ LOG.info("connector removed with URI [" + connectorURI + "]");
+ }
+
+ /**
+ * convert search result into URI
+ *
+ * @param result search result to convert to URI
+ */
+ protected URI toURI(SearchResult result)
+ throws Exception
+ {
+ Attributes attributes = result.getAttributes();
+ String address = (String)attributes.get("iphostnumber").get();
+ String port = (String)attributes.get("ipserviceport").get();
+ String protocol = (String)attributes.get("ipserviceprotocol").get();
+ URI connectorURI = new URI("static:(" + protocol + "://" + address + ":"
+ port + ")");
+ LOG.debug("retrieved URI from SearchResult [" + connectorURI + "]");
+ return connectorURI;
+ }
+
+ /**
+ * convert search result into URI
+ *
+ * @param result search result to convert to URI
+ */
+ protected String toUUID(SearchResult result)
+ {
+ String uuid = result.getNameInNamespace();
+ LOG.debug("retrieved UUID from SearchResult [" + uuid + "]");
+ return uuid;
+ }
+
+ /**
+ * invoked when an entry has been added during a persistent search
+ */
+ public void objectAdded(NamingEvent event)
+ {
+ LOG.debug("entry added");
+ try
+ { addConnector((SearchResult)event.getNewBinding()); }
+ catch(Exception err)
+ { LOG.error("ERR: caught unexpected exception", err); }
+ }
+
+ /**
+ * invoked when an entry has been removed during a persistent search
+ */
+ public void objectRemoved(NamingEvent event)
+ {
+ LOG.debug("entry removed");
+ try
+ { removeConnector((SearchResult)event.getOldBinding()); }
+ catch(Exception err)
+ { LOG.error("ERR: caught unexpected exception", err); }
+ }
+
+ /**
+ * invoked when an entry has been renamed during a persistent search
+ */
+ public void objectRenamed(NamingEvent event)
+ {
+ LOG.debug("entry renamed");
+ // XXX: getNameInNamespace method does not seem to work properly,
+ // but getName seems to provide the result we want
+ String uuidOld = event.getOldBinding().getName();
+ String uuidNew = event.getNewBinding().getName();
+ URI connectorURI = uuidMap.remove(uuidOld);
+ uuidMap.put(uuidNew, connectorURI);
+ LOG.debug("connector reference renamed for URI [" + connectorURI + "],
Old UUID [" + uuidOld + "], New UUID [" + uuidNew + "]");
+ }
+
+ /**
+ * invoked when an entry has been changed during a persistent search
+ */
+ public void objectChanged(NamingEvent event)
+ {
+ LOG.debug("entry changed");
+ try
+ {
+ SearchResult result = (SearchResult)event.getNewBinding();
+ removeConnector(result);
+ addConnector(result);
+ }
+ catch(Exception err)
+ { LOG.error("ERR: caught unexpected exception", err); }
+ }
+
+ /**
+ * invoked when an exception has occurred during a persistent search
+ */
+ public void namingExceptionThrown(NamingExceptionEvent event)
+ { LOG.error("ERR: caught unexpected exception", event.getException()); }
}