Repository: qpid-jms Updated Branches: refs/heads/master 8c9456bfd -> 5b3c02920
https://issues.apache.org/jira/browse/QPIDJMS-46 Improvements in the discovery module. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/5b3c0292 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/5b3c0292 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/5b3c0292 Branch: refs/heads/master Commit: 5b3c02920f336df0596ef3c7aa93283a4f369387 Parents: 8c9456b Author: Timothy Bish <[email protected]> Authored: Thu May 7 19:19:17 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Thu May 7 19:19:17 2015 -0400 ---------------------------------------------------------------------- .../jms/provider/discovery/DiscoveryAgent.java | 20 ++ .../jms/provider/discovery/DiscoveryEvent.java | 45 ---- .../provider/discovery/DiscoveryListener.java | 14 +- .../provider/discovery/DiscoveryProvider.java | 99 ++++--- .../discovery/DiscoveryProviderFactory.java | 52 ++-- .../file/FileWatcherDiscoveryAgent.java | 260 +++++++++++++++++++ .../file/FileWatcherDiscoveryAgentFactory.java | 55 ++++ .../discovery/multicast/DiscoveryEvent.java | 47 ++++ .../multicast/MulticastDiscoveryAgent.java | 24 +- .../MulticastDiscoveryAgentFactory.java | 11 +- .../discovery/multicast/PacketParser.java | 2 - .../multicast/parsers/ActiveMQPacketParser.java | 27 +- .../org/apache/qpid/jms/provider/agents/file | 17 ++ .../jms/discovery/FileWatcherDiscoveryTest.java | 220 ++++++++++++++++ .../src/test/resources/log4j.properties | 2 +- 15 files changed, 775 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryAgent.java ---------------------------------------------------------------------- diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryAgent.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryAgent.java index 051f567..faa393b 100644 --- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryAgent.java +++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryAgent.java @@ -17,6 +17,7 @@ package org.apache.qpid.jms.provider.discovery; import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; /** * Interface for all agents used to detect instances of remote peers on the network. @@ -24,6 +25,25 @@ import java.io.IOException; public interface DiscoveryAgent { /** + * Indicates if this DiscoveryAgent requires a ScheduledExecutorService in order + * to perform its discovery work. + * + * @returns true if the agent requires that its parent provide it with a scheduler. + */ + boolean isSchedulerRequired(); + + /** + * Provider a ScheduledExecutorService to the DiscoveryAgent that requires a + * scheduler to perform its discovery work. If the agent performs long polling + * style operations such as a socket read then it should not use the provided + * scheduler as that could block other agents from performing their own work. + * + * @param scheduler + * An initialized Scheduler service that this agent can use for its work. + */ + void setScheduler(ScheduledExecutorService scheduler); + + /** * Sets the discovery listener * * @param listener http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryEvent.java ---------------------------------------------------------------------- diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryEvent.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryEvent.java deleted file mode 100644 index 0fc2f29..0000000 --- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryEvent.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.provider.discovery; - -/** - * Event class used to convey discovered remote peer information to the - * DiscoveryProvider. - */ -public class DiscoveryEvent { - - public enum EventType { - ALIVE, - SHUTDOWN - }; - - private final String peerUri; - private final EventType type; - - public DiscoveryEvent(String peerUri, EventType type) { - this.peerUri = peerUri; - this.type = type; - } - - public String getPeerUri() { - return peerUri; - } - - public EventType getType() { - return type; - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryListener.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryListener.java index 07e9895..e92bbb2 100644 --- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryListener.java +++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryListener.java @@ -16,6 +16,8 @@ */ package org.apache.qpid.jms.provider.discovery; +import java.net.URI; + /** * A listener of services being added or removed from a network */ @@ -24,17 +26,17 @@ public interface DiscoveryListener { /** * Called when a DiscoveryAgent becomes aware of a new remote peer. * - * @param event - * the event data which contains the peer address and optional name. + * @param remoteURI + * the URI of the newly discovered peer. */ - void onServiceAdd(DiscoveryEvent event); + void onServiceAdd(URI remoteURI); /** * Called when a DiscoveryAgent can no longer detect a previously known remote peer. * - * @param event - * the event data which contains the peer address and optional name. + * @param remoteURI + * the URI of the previously discovered peer that is no longer active. */ - void onServiceRemove(DiscoveryEvent event); + void onServiceRemove(URI remoteURI); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProvider.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProvider.java index 1d3a0f0..739a7a0 100644 --- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProvider.java +++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProvider.java @@ -18,26 +18,31 @@ package org.apache.qpid.jms.provider.discovery; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; -import java.util.concurrent.ConcurrentHashMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import org.apache.qpid.jms.provider.ProviderWrapper; import org.apache.qpid.jms.provider.failover.FailoverProvider; +import org.apache.qpid.jms.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * An AsyncProvider instance that wraps the FailoverProvider and listens for - * events about discovered remote peers using a configured DiscoveryAgent - * instance. + * An Provider instance that wraps the FailoverProvider and listens for events + * about discovered remote peers using a configured set of DiscoveryAgent instance. */ public class DiscoveryProvider extends ProviderWrapper<FailoverProvider> implements DiscoveryListener { private static final Logger LOG = LoggerFactory.getLogger(DiscoveryProviderFactory.class); private final URI discoveryUri; - private DiscoveryAgent discoveryAgent; - private final ConcurrentHashMap<String, URI> serviceURIs = new ConcurrentHashMap<String, URI>(); + private final List<DiscoveryAgent> discoveryAgents = new ArrayList<DiscoveryAgent>(); + + private ScheduledExecutorService sharedScheduler; /** * Creates a new instance of the DiscoveryProvider. @@ -56,19 +61,29 @@ public class DiscoveryProvider extends ProviderWrapper<FailoverProvider> impleme @Override public void start() throws IOException, IllegalStateException { - if (this.discoveryAgent == null) { + if (discoveryAgents.isEmpty()) { throw new IllegalStateException("No DiscoveryAgent configured."); } - discoveryAgent.setDiscoveryListener(this); - discoveryAgent.start(); + for (DiscoveryAgent discoveryAgent : discoveryAgents) { + discoveryAgent.setDiscoveryListener(this); + if (discoveryAgent.isSchedulerRequired()) { + discoveryAgent.setScheduler(getSharedScheduler()); + } + discoveryAgent.start(); + } super.start(); } @Override public void close() { - discoveryAgent.close(); + ThreadPoolUtils.shutdownGraceful(sharedScheduler); + + for (DiscoveryAgent discoveryAgent : discoveryAgents) { + discoveryAgent.close(); + } + super.close(); } @@ -82,10 +97,10 @@ public class DiscoveryProvider extends ProviderWrapper<FailoverProvider> impleme } /** - * @return the configured DiscoveryAgent instance used by this DiscoveryProvider. + * @return a list of configured DiscoveryAgent instances used by this DiscoveryProvider. */ - public DiscoveryAgent getDiscoveryAgent() { - return this.discoveryAgent; + public List<DiscoveryAgent> getDiscoveryAgents() { + return Collections.unmodifiableList(discoveryAgents); } /** @@ -94,32 +109,25 @@ public class DiscoveryProvider extends ProviderWrapper<FailoverProvider> impleme * @param agent * the agent to use to discover remote peers */ - public void setDiscoveryAgent(DiscoveryAgent agent) { - this.discoveryAgent = agent; + public void setDiscoveryAgents(List<DiscoveryAgent> agents) { + discoveryAgents.addAll(agents); } //------------------- Discovery Event Handlers ---------------------------// @Override - public void onServiceAdd(DiscoveryEvent event) { - String url = event.getPeerUri(); - if (url != null) { - try { - URI uri = new URI(url); - LOG.info("Adding new peer connection URL: {}", uri); - serviceURIs.put(event.getPeerUri(), uri); - next.add(uri); - } catch (URISyntaxException e) { - LOG.warn("Could not add remote URI: {} due to bad URI syntax: {}", url, e.getMessage()); - } + public void onServiceAdd(URI remoteURI) { + if (remoteURI != null) { + LOG.debug("Adding URI of remote peer: {}", remoteURI); + next.add(remoteURI); } } @Override - public void onServiceRemove(DiscoveryEvent event) { - URI uri = serviceURIs.get(event.getPeerUri()); - if (uri != null) { - next.remove(uri); + public void onServiceRemove(URI remoteURI) { + if (remoteURI != null) { + LOG.debug("Removing URI of remote peer: {}", remoteURI); + next.remove(remoteURI); } } @@ -127,13 +135,38 @@ public class DiscoveryProvider extends ProviderWrapper<FailoverProvider> impleme @Override public void onConnectionInterrupted(URI remoteURI) { - this.discoveryAgent.resume(); + for (DiscoveryAgent discoveryAgent : discoveryAgents) { + discoveryAgent.resume(); + } + super.onConnectionInterrupted(remoteURI); } @Override public void onConnectionRestored(URI remoteURI) { - this.discoveryAgent.suspend(); + for (DiscoveryAgent discoveryAgent : discoveryAgents) { + discoveryAgent.suspend(); + } + super.onConnectionRestored(remoteURI); } + + //----- Internal implementation ------------------------------------------// + + private ScheduledExecutorService getSharedScheduler() { + if (sharedScheduler == null) { + sharedScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + + @Override + public Thread newThread(Runnable runner) { + Thread serial = new Thread(runner); + serial.setDaemon(true); + serial.setName(DiscoveryProvider.this.getClass().getSimpleName() + ":[" + getDiscoveryURI() + "]"); + return serial; + } + }); + } + + return sharedScheduler; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java index 33d4990..a989f49 100644 --- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java +++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java @@ -17,7 +17,8 @@ package org.apache.qpid.jms.provider.discovery; import java.net.URI; -import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.qpid.jms.provider.Provider; @@ -32,7 +33,15 @@ import org.apache.qpid.jms.util.URISupport.CompositeData; */ public class DiscoveryProviderFactory extends ProviderFactory { - private static final String DISCOVERED_OPTION_PREFIX = "discovered."; + /** + * Prefix used for all properties that apply specifically to the DiscoveryProvider + */ + public static final String DISCOVERY_OPTION_PREFIX = "discovery."; + + /** + * Prefix used for all properties that should be applied to any discovered remote URIs. + */ + public static final String DISCOVERED_OPTION_PREFIX = "discovered."; @Override public Provider createProvider(URI remoteURI) throws Exception { @@ -40,28 +49,33 @@ public class DiscoveryProviderFactory extends ProviderFactory { CompositeData composite = URISupport.parseComposite(remoteURI); Map<String, String> options = composite.getParameters(); - // We currently only allow for one agent to feed URIs to the embedded FailoverProvider - // in the DiscoveryProvider. We could allow more in the future if we found that to be - // a useful feature. - if (composite.getComponents().size() > 1) { - throw new URISyntaxException(remoteURI.toString(), "Only one discovery agent can be specified"); - } + Map<String, String> discoveryOptions = PropertyUtil.filterProperties(options, DISCOVERY_OPTION_PREFIX); + Map<String, String> discoveredOptions = PropertyUtil.filterProperties(options, DISCOVERED_OPTION_PREFIX); // Failover will apply the nested options to each URI while attempting to connect. - Map<String, String> nested = PropertyUtil.filterProperties(options, DISCOVERED_OPTION_PREFIX); - FailoverProvider failover = new FailoverProvider(nested); - PropertyUtil.setProperties(failover, options); - - // TODO - Revisit URI options setting and enhance the ProperyUtils to provide a - // means of setting some properties on a object and obtaining the leftovers - // so we can pass those along to the next until we consume them all or we - // have leftovers which implies a bad URI. + FailoverProvider failover = new FailoverProvider(discoveredOptions); + discoveryOptions = PropertyUtil.setProperties(failover, discoveryOptions); DiscoveryProvider discovery = new DiscoveryProvider(remoteURI, failover); - PropertyUtil.setProperties(discovery, options); + discoveryOptions = PropertyUtil.setProperties(discovery, discoveryOptions); + + if (!discoveryOptions.isEmpty()) { + String msg = "" + + " Not all options could be set on the Discovery provider." + + " Check the options are spelled correctly." + + " Unused parameters=[" + discoveryOptions + "]." + + " This Provider cannot be started."; + throw new IllegalArgumentException(msg); + } + + List<URI> agentURIs = composite.getComponents(); + List<DiscoveryAgent> discoveryAgents = new ArrayList<DiscoveryAgent>(agentURIs.size()); + + for (URI agentURI : agentURIs) { + discoveryAgents.add(DiscoveryAgentFactory.createAgent(agentURI)); + } - DiscoveryAgent agent = DiscoveryAgentFactory.createAgent(composite.getComponents().get(0)); - discovery.setDiscoveryAgent(agent); + discovery.setDiscoveryAgents(discoveryAgents); return discovery; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgent.java ---------------------------------------------------------------------- diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgent.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgent.java new file mode 100644 index 0000000..832d64c --- /dev/null +++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgent.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.discovery.file; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.StringTokenizer; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.jms.provider.discovery.DiscoveryAgent; +import org.apache.qpid.jms.provider.discovery.DiscoveryListener; +import org.apache.qpid.jms.util.ThreadPoolUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Discovery agent that watches a file and periodically reads in remote URIs + * from that file. + */ +public class FileWatcherDiscoveryAgent implements DiscoveryAgent { + + private static final Logger LOG = LoggerFactory.getLogger(FileWatcherDiscoveryAgent.class); + + private static final int DEFAULT_UPDATE_INTERVAL = 30000; + + private ScheduledExecutorService scheduler; + private final Set<URI> discovered = new LinkedHashSet<URI>(); + + private final URI discoveryURI; + private final AtomicBoolean started = new AtomicBoolean(false); + + private DiscoveryListener listener; + private int updateInterval = DEFAULT_UPDATE_INTERVAL; + private boolean warnOnWatchedReadError; + + public FileWatcherDiscoveryAgent(URI discoveryURI) { + this.discoveryURI = discoveryURI; + } + + @Override + public void setDiscoveryListener(DiscoveryListener listener) { + this.listener = listener; + } + + public DiscoveryListener getDiscoveryListener() { + return this.listener; + } + + @Override + public boolean isSchedulerRequired() { + return true; + } + + @Override + public void setScheduler(ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + } + + @Override + public void start() throws IOException, IllegalStateException { + if (listener == null) { + throw new IllegalStateException("No DiscoveryListener configured."); + } + + if (scheduler == null) { + throw new IllegalStateException("No scheduler service has been provided."); + } + + if (started.compareAndSet(false, true)) { + scheduler.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + LOG.debug("Performing watched resources scheduled update: {}", getDiscvoeryURI()); + updateWatchedResources(); + } + }, 0, getUpdateInterval(), TimeUnit.MILLISECONDS); + } + } + + @Override + public void close() { + if (started.compareAndSet(true, false)) { + ThreadPoolUtils.shutdownGraceful(scheduler); + } + } + + @Override + public void suspend() { + // We don't suspend watching the file updates are passive. + } + + @Override + public void resume() { + // We don't suspend watching the file updates are passive. + } + + @Override + public String toString() { + return "FileWatcherDiscoveryAgent: listener:" + getDiscvoeryURI(); + } + + // ---------- Property Accessors ------------------------------------------// + + /** + * @return the original URI used to create the Discovery Agent. + */ + public URI getDiscvoeryURI() { + return this.discoveryURI; + } + + /** + * @return the configured resource update interval. + */ + public int getUpdateInterval() { + return updateInterval; + } + + /** + * @param updateInterval + * the update interval to use for watching resources for changes. + */ + public void setUpdateInterval(int updateInterval) { + this.updateInterval = updateInterval; + } + + //----- Internal implementation ------------------------------------------// + + private void updateWatchedResources() { + String fileURL = getDiscvoeryURI().toString(); + if (fileURL != null) { + BufferedReader in = null; + String newUris = null; + StringBuffer buffer = new StringBuffer(); + + try { + in = new BufferedReader(getURLStream(fileURL)); + while (true) { + String line = in.readLine(); + if (line == null) { + break; + } + buffer.append(line); + } + newUris = buffer.toString(); + } catch (IOException ioe) { + if (!warnOnWatchedReadError) { + LOG.warn("Failed to read watched resource: " + fileURL); + LOG.trace("Resource read error:", ioe); + } else { + LOG.debug("Failed to read watched resource: " + fileURL); + LOG.trace("Resource read error:", ioe); + } + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException ioe) { + // ignore + } + } + } + + processURIs(newUris); + } + } + + private InputStreamReader getURLStream(String path) throws IOException { + InputStreamReader result = null; + URL url = null; + + try { + url = new URL(path); + result = new InputStreamReader(url.openStream()); + } catch (MalformedURLException e) { + // ignore - it could be a path to a a local file + } + + if (result == null) { + result = new FileReader(path); + } + + return result; + } + + private final void processURIs(String updatedURIs) { + if (updatedURIs != null) { + updatedURIs = updatedURIs.trim(); + if (!updatedURIs.isEmpty()) { + List<URI> list = new ArrayList<URI>(); + StringTokenizer tokenizer = new StringTokenizer(updatedURIs, ","); + while (tokenizer.hasMoreTokens()) { + String str = tokenizer.nextToken(); + try { + URI uri = new URI(str); + list.add(uri); + } catch (Exception e) { + LOG.error("Failed to parse broker address: " + str, e); + } + } + if (list.isEmpty() == false) { + try { + updateURIs(list); + } catch (IOException e) { + LOG.error("Failed to update transport URI's from: " + updatedURIs, e); + } + } + } + } + } + + private void updateURIs(List<URI> updates) throws IOException { + + // Remove any previously discovered URIs that are no longer in the watched resource + HashSet<URI> removedPeers = new HashSet<URI>(discovered); + removedPeers.removeAll(updates); + + for (URI removed : removedPeers) { + listener.onServiceRemove(removed); + } + + // Only add the newly discovered remote peers + HashSet<URI> addedPeers = new HashSet<URI>(updates); + addedPeers.removeAll(discovered); + + for (URI addition : addedPeers) { + listener.onServiceAdd(addition); + } + + // Now just store the new set of URIs and advertise them. + discovered.clear(); + discovered.addAll(updates); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgentFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgentFactory.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgentFactory.java new file mode 100644 index 0000000..e1108b4 --- /dev/null +++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgentFactory.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.discovery.file; + +import java.net.URI; +import java.util.Map; + +import org.apache.qpid.jms.provider.discovery.DiscoveryAgent; +import org.apache.qpid.jms.provider.discovery.DiscoveryAgentFactory; +import org.apache.qpid.jms.util.PropertyUtil; +import org.apache.qpid.jms.util.URISupport; + +/** + * Creates and configures a new instance of the file watcher based agent. + */ +public class FileWatcherDiscoveryAgentFactory extends DiscoveryAgentFactory { + + @Override + public DiscoveryAgent createDiscoveryAgent(URI discoveryURI) throws Exception { + FileWatcherDiscoveryAgent agent = new FileWatcherDiscoveryAgent(discoveryURI); + + Map<String, String> options = URISupport.parseParameters(discoveryURI); + options = PropertyUtil.setProperties(agent, options); + + if (!options.isEmpty()) { + String msg = "" + + " Not all options could be set on the File Watcher discovery." + + " agent. Check the options are spelled correctly." + + " Unused parameters=[" + options + "]." + + " This agent cannot be started."; + throw new IllegalArgumentException(msg); + } + + return agent; + } + + @Override + public String getName() { + return "File-Watcher"; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/DiscoveryEvent.java ---------------------------------------------------------------------- diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/DiscoveryEvent.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/DiscoveryEvent.java new file mode 100644 index 0000000..99b40fb --- /dev/null +++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/DiscoveryEvent.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.discovery.multicast; + +import java.net.URI; + +/** + * Event class used to convey discovered remote peer information to the + * DiscoveryProvider. + */ +public class DiscoveryEvent { + + public enum EventType { + ALIVE, + SHUTDOWN + }; + + private final URI peerUri; + private final EventType type; + + public DiscoveryEvent(URI peerUri, EventType type) { + this.peerUri = peerUri; + this.type = type; + } + + public URI getPeerUri() { + return peerUri; + } + + public EventType getType() { + return type; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgent.java ---------------------------------------------------------------------- diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgent.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgent.java index 885f70c..9a67458 100644 --- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgent.java +++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgent.java @@ -35,12 +35,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.jms.provider.discovery.DiscoveryAgent; -import org.apache.qpid.jms.provider.discovery.DiscoveryEvent; -import org.apache.qpid.jms.provider.discovery.DiscoveryEvent.EventType; import org.apache.qpid.jms.provider.discovery.DiscoveryListener; +import org.apache.qpid.jms.provider.discovery.multicast.DiscoveryEvent.EventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +70,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { private URI discoveryURI; private int timeToLive = 1; private boolean loopBackMode; - private final Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>(); + private final Map<URI, RemoteBrokerData> brokersByService = new ConcurrentHashMap<URI, RemoteBrokerData>(); private String group = "default"; private InetAddress inetAddress; private SocketAddress sockAddress; @@ -98,6 +98,16 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { } @Override + public void setScheduler(ScheduledExecutorService scheduler) { + // Not needed for this agent + } + + @Override + public boolean isSchedulerRequired() { + return false; + } + + @Override public void start() throws IOException, IllegalStateException { if (listener == null) { throw new IllegalStateException("No DiscoveryListener configured."); @@ -233,7 +243,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { private void processAlive(DiscoveryEvent event) { RemoteBrokerData data = brokersByService.get(event.getPeerUri()); if (data == null) { - String peerUri = event.getPeerUri(); + URI peerUri = event.getPeerUri(); data = new RemoteBrokerData(event.getPeerUri()); brokersByService.put(peerUri, data); fireServiceAddEvent(data); @@ -261,13 +271,13 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { private void fireServiceRemovedEvent(final RemoteBrokerData data) { if (listener != null && started.get()) { - listener.onServiceRemove(data); + listener.onServiceRemove(data.getPeerUri()); } } private void fireServiceAddEvent(final RemoteBrokerData data) { if (listener != null && started.get()) { - listener.onServiceAdd(data); + listener.onServiceAdd(data.getPeerUri()); } } @@ -436,7 +446,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { long lastHeartBeat; - public RemoteBrokerData(String peerUri) { + public RemoteBrokerData(URI peerUri) { super(peerUri, EventType.ALIVE); this.lastHeartBeat = System.currentTimeMillis(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgentFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgentFactory.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgentFactory.java index 6730f33..a20a4b2 100644 --- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgentFactory.java +++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgentFactory.java @@ -35,7 +35,16 @@ public class MulticastDiscoveryAgentFactory extends DiscoveryAgentFactory { public DiscoveryAgent createDiscoveryAgent(URI discoveryURI) throws Exception { MulticastDiscoveryAgent agent = new MulticastDiscoveryAgent(discoveryURI); Map<String, String> options = URISupport.parseParameters(discoveryURI); - PropertyUtil.setProperties(agent, options); + + options = PropertyUtil.setProperties(agent, options); + if (!options.isEmpty()) { + String msg = "" + + " Not all options could be set on the Multicast discovery." + + " agent. Check the options are spelled correctly." + + " Unused parameters=[" + options + "]." + + " This agent cannot be started."; + throw new IllegalArgumentException(msg); + } String service = agent.getService(); if (service == null || service.isEmpty()) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParser.java ---------------------------------------------------------------------- diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParser.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParser.java index 294234e..3c45bd6 100644 --- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParser.java +++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParser.java @@ -16,8 +16,6 @@ */ package org.apache.qpid.jms.provider.discovery.multicast; -import org.apache.qpid.jms.provider.discovery.DiscoveryEvent; - /** * Interface for a DatagramPacket parser object which is used by the * MulticastDiscoveryAget to parse incoming packets to determine the http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParser.java ---------------------------------------------------------------------- diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParser.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParser.java index 1cb2935..e2fa1d1 100644 --- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParser.java +++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParser.java @@ -16,15 +16,22 @@ */ package org.apache.qpid.jms.provider.discovery.multicast.parsers; -import org.apache.qpid.jms.provider.discovery.DiscoveryEvent; -import org.apache.qpid.jms.provider.discovery.DiscoveryEvent.EventType; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.qpid.jms.provider.discovery.multicast.DiscoveryEvent; +import org.apache.qpid.jms.provider.discovery.multicast.DiscoveryEvent.EventType; import org.apache.qpid.jms.provider.discovery.multicast.PacketParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Parser instance for ActiveMQ multicast discovery processing. */ public class ActiveMQPacketParser implements PacketParser { + private static final Logger LOG = LoggerFactory.getLogger(ActiveMQPacketParser.class); + private static final String TYPE_SUFFIX = "ActiveMQ-4."; private static final String ALIVE = "alive."; private static final String DEAD = "dead."; @@ -50,12 +57,20 @@ public class ActiveMQPacketParser implements PacketParser { String payload = str.substring(getType().length()); if (payload.startsWith(ALIVE)) { String brokerName = getBrokerName(payload.substring(ALIVE.length())); - String brokerUri = payload.substring(ALIVE.length() + brokerName.length() + 2); - event = new DiscoveryEvent(brokerUri, EventType.ALIVE); + try { + String brokerUri = payload.substring(ALIVE.length() + brokerName.length() + 2); + event = new DiscoveryEvent(new URI(brokerUri), EventType.ALIVE); + } catch (URISyntaxException ex) { + LOG.warn("Published URI has invalid URI syntax, ignoring: {}", payload); + } } else { String brokerName = getBrokerName(payload.substring(DEAD.length())); - String brokerUri = payload.substring(DEAD.length() + brokerName.length() + 2); - event = new DiscoveryEvent(brokerUri, EventType.SHUTDOWN); + try { + String brokerUri = payload.substring(DEAD.length() + brokerName.length() + 2); + event = new DiscoveryEvent(new URI(brokerUri), EventType.SHUTDOWN); + } catch (URISyntaxException ex) { + LOG.warn("Published URI has invalid URI syntax, ignoring: {}", payload); + } } } return event; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/file ---------------------------------------------------------------------- diff --git a/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/file b/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/file new file mode 100644 index 0000000..0bf8b3a --- /dev/null +++ b/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/file @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.qpid.jms.provider.discovery.file.FileWatcherDiscoveryAgentFactory http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java new file mode 100644 index 0000000..1c22e25 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.discovery; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsConnectionListener; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test that the file watcher Discovery Provider finds a broker URI in + * the file it is directed to watch. + */ +public class FileWatcherDiscoveryTest extends AmqpTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(FileWatcherDiscoveryTest.class); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(new File("./target")); + + private CountDownLatch connected; + private CountDownLatch interrupted; + private CountDownLatch restored; + private JmsConnection jmsConnection; + + private File primaryBrokerList; + private File secondaryBrokerList; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + connected = new CountDownLatch(1); + interrupted = new CountDownLatch(1); + restored = new CountDownLatch(1); + + primaryBrokerList = folder.newFile("primaryBrokerURIsFile.txt"); + secondaryBrokerList = folder.newFile("secondaryBrokerURIsFile.txt"); + + LOG.info("Broker URIs going to file: {}", primaryBrokerList); + + writeOutBrokerURIsToFile(primaryBrokerList); + } + + @Test(timeout = 60000) + public void testConnectedToStoredBrokerURI() throws Exception { + assertTrue(primaryBrokerList.exists()); + + connection = createConnection(); + connection.start(); + + assertTrue("connection never connected.", connected.await(30, TimeUnit.SECONDS)); + } + + @Test(timeout = 60000) + public void testReconnectWhenURIUpdates() throws Exception { + assertTrue(primaryBrokerList.exists()); + + connection = createConnection(); + connection.start(); + + assertTrue("connection never connected.", connected.await(30, TimeUnit.SECONDS)); + + stopPrimaryBroker(); + + assertTrue("connection should be interrupted.", interrupted.await(30, TimeUnit.SECONDS)); + + startPrimaryBroker(); + + writeOutBrokerURIsToFile(primaryBrokerList); + + assertTrue("connection should have been reestablished.", restored.await(30, TimeUnit.SECONDS)); + } + + @Test(timeout = 60000) + public void testReconnectUsingTwoFiles() throws Exception { + assertTrue(primaryBrokerList.exists()); + assertTrue(secondaryBrokerList.exists()); + + connection = createConnection(new File[]{ primaryBrokerList, secondaryBrokerList }); + connection.start(); + + assertTrue("connection never connected.", connected.await(30, TimeUnit.SECONDS)); + + stopPrimaryBroker(); + + assertTrue("connection should be interrupted.", interrupted.await(30, TimeUnit.SECONDS)); + + startPrimaryBroker(); + + writeOutBrokerURIsToFile(secondaryBrokerList); + + assertTrue("connection should have been reestablished.", restored.await(30, TimeUnit.SECONDS)); + } + + @Test(timeout = 60000) + public void testWithInitiallyNonExistingFile() throws Exception { + assertTrue(primaryBrokerList.exists()); + + final String FILENAME = "nonExistentFile.txt"; + + File nonExistentFile = new File(folder.getRoot(), FILENAME); + assertFalse(nonExistentFile.exists()); + + connection = createConnection(new File[]{ primaryBrokerList, nonExistentFile }); + connection.start(); + + assertTrue("connection never connected.", connected.await(30, TimeUnit.SECONDS)); + + stopPrimaryBroker(); + + assertTrue("connection should be interrupted.", interrupted.await(30, TimeUnit.SECONDS)); + + startPrimaryBroker(); + + folder.newFile(FILENAME); + assertTrue(nonExistentFile.exists()); + + writeOutBrokerURIsToFile(nonExistentFile); + + assertTrue("connection should have been reestablished.", restored.await(30, TimeUnit.SECONDS)); + } + + protected Connection createConnection() throws Exception { + return createConnection(new File[]{ primaryBrokerList }); + } + + protected Connection createConnection(File[] filesToWatch) throws Exception { + + StringBuilder fileURIs = new StringBuilder(); + for (File file : filesToWatch) { + if (fileURIs.length() == 0) { + fileURIs.append(file.toURI()); + fileURIs.append("?updateInterval=1000"); + } else { + fileURIs.append(","); + fileURIs.append(file.toURI()); + fileURIs.append("?updateInterval=1000"); + } + } + + JmsConnectionFactory factory = new JmsConnectionFactory( + "discovery:(" + fileURIs.toString() + ")?discovery.maxReconnectDelay=500"); + connection = factory.createConnection(); + + jmsConnection = (JmsConnection) connection; + jmsConnection.addConnectionListener(new JmsConnectionListener() { + + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection reports established. Connected to -> {}", remoteURI); + connected.countDown(); + } + + @Override + public void onConnectionInterrupted(URI remoteURI) { + LOG.info("Connection reports interrupted. Lost connection to -> {}", remoteURI); + interrupted.countDown(); + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection reports restored. Connected to -> {}", remoteURI); + restored.countDown(); + } + + @Override + public void onConnectionFailure(Throwable error) { + } + + @Override + public void onInboundMessage(JmsInboundMessageDispatch envelope) { + } + }); + + return connection; + } + + protected void writeOutBrokerURIsToFile(File targetFile) throws Exception { + try (FileOutputStream out = new FileOutputStream(targetFile);) { + for (URI brokerURI : getBrokerURIs()) { + LOG.info("Broker URI being written: {}", brokerURI); + out.write(brokerURI.toString().getBytes("UTF-8")); + } + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties index f69efa0..2b107ef 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties @@ -21,7 +21,7 @@ log4j.rootLogger=INFO, out, stdout log4j.logger.org.apache.qpid.jms=INFO -log4j.logger.org.apache.qpid.jms.provider.failover=TRACE +log4j.logger.org.apache.qpid.jms.provider=DEBUG # Tune the ActiveMQ and it's AMQP transport as needed for debugging. log4j.logger.org.apache.activemq=INFO --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
