Repository: activemq-artemis Updated Branches: refs/heads/master 7dfde208c -> 54d9a3e9b
ARTEMIS-208 BrokerInfo issue, also: enlarged the default max size for tests to avoid send blocking. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/34e127cc Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/34e127cc Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/34e127cc Branch: refs/heads/master Commit: 34e127cc0ce1f53c354a9d08b3e0daa747b14f3e Parents: 7dfde20 Author: Howard Gao <[email protected]> Authored: Mon Aug 24 10:28:25 2015 +0800 Committer: Clebert Suconic <[email protected]> Committed: Mon Aug 24 22:54:12 2015 -0400 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 4 +++- .../openwire/OpenWireProtocolManager.java | 21 ++++++++++++++++++++ .../core/remoting/impl/netty/NettyAcceptor.java | 4 ++++ .../artemiswrapper/ArtemisBrokerWrapper.java | 4 ++-- 4 files changed, 30 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34e127cc/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 30ffb06..3cddb29 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -270,6 +270,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor { private void negotiate(WireFormatInfo command) throws IOException { this.wireFormat.renegotiateWireFormat(command); + //throw back a brokerInfo here + protocolManager.sendBrokerInfo(this); } @Override @@ -1084,7 +1086,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor { } } } - catch (Exception e) { + catch (Throwable e) { if (e instanceof ActiveMQSecurityException) { resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34e127cc/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 8c20c46..5489fdf 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdap import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -67,6 +68,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.ConnectionId; @@ -135,6 +137,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No private final ScheduledExecutorService scheduledPool; + private BrokerInfo brokerInfo = new BrokerInfo(); + public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; this.server = server; @@ -148,6 +152,12 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No if (service != null) { service.addNotificationListener(this); } + brokerInfo.setBrokerName(server.getIdentity()); + brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString())); + brokerInfo.setPeerBrokerInfos(null); + brokerInfo.setFaultTolerantConfiguration(false); + brokerInfo.setBrokerURL(null); + } public ProtocolManagerFactory<Interceptor> getFactory() { @@ -162,6 +172,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) { + if (brokerInfo.getBrokerURL() == null) { + NettyAcceptor nettyAcceptor = (NettyAcceptor)acceptorUsed; + brokerInfo.setBrokerURL(nettyAcceptor.getURL()); + } OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat(); OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf); owConn.init(); @@ -693,4 +707,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName())); server.destroyQueue(subQueueName); } + + public void sendBrokerInfo(OpenWireConnection connection) { + BrokerInfo copy = brokerInfo.copy(); + //cluster support yet to support + copy.setPeerBrokerInfos(null); + connection.dispatchAsync(copy); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34e127cc/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index f0fafaf..a53b886 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -579,6 +579,10 @@ public class NettyAcceptor implements Acceptor { } return sb.toString(); } + + public String getURL() { + return "tcp://" + this.host + ":" + this.port; + } // Inner classes ----------------------------------------------------------------------------- private final class ActiveMQServerChannelHandler extends ActiveMQChannelHandler implements ConnectionCreator { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34e127cc/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java index bf92a9c..723529f 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java @@ -200,7 +200,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY); } if (entry.isProducerFlowControl()) { - settings.setMaxSizeBytes(10240).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + settings.setMaxSizeBytes(10240000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); if (bservice.getSystemUsage().isSendFailIfNoSpace()) { settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); } @@ -215,7 +215,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { settingsMap.put("#", defSettings); } if (defaultEntry.isProducerFlowControl()) { - defSettings.setMaxSizeBytes(10240).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + defSettings.setMaxSizeBytes(10240000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); if (bservice.getSystemUsage().isSendFailIfNoSpace()) { defSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); }
