[
https://issues.apache.org/jira/browse/AMQ-3077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12971206#action_12971206
]
Eric commented on AMQ-3077:
---------------------------
public Response processBrokerInfo(BrokerInfo info) {
if (info.isSlaveBroker()) {
BrokerService bService = connector.getBrokerService();
// Do we only support passive slaves - or does the slave want to be
// passive ?
boolean passive = bService.isPassiveSlave() ||
info.isPassiveSlave();
if (passive == false) {
// stream messages from this broker (the master) to
// the slave
MutableBrokerFilter parent = (MutableBrokerFilter)
broker.getAdaptor(MutableBrokerFilter.class);
masterBroker = new MasterBroker(parent, transport);
masterBroker.startProcessing();
}
LOG.info((passive?"Passive":"Active")+" Slave Broker " +
info.getBrokerName() + " is attached");
bService.slaveConnectionEstablished();
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
// so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ...
try {
// We first look if existing network connection already exists
for the same broker Id
// It's possible in case of brief network fault to have this
transport connector side of the connection always active
// and the duplex network connector side wanting to open a new
one
// In this case, the old connection must be broken
BrokerId remoteBrokerId = info.getBrokerId();
setDuplexRemoteBrokerId(remoteBrokerId);
CopyOnWriteArrayList<TransportConnection> connections =
this.connector.getConnections();
for (Iterator<TransportConnection> iter =
connections.iterator(); iter.hasNext();) {
TransportConnection c = iter.next();
if ((c != this) &&
(remoteBrokerId.equals(c.getDuplexRemoteBrokerId()))) {
LOG.warn("An existing duplex active connection already
exists for this broker (" + remoteBrokerId + "). Stopping it.");
c.stop();
}
}
Properties properties =
MarshallingSupport.stringToProperties(info.getNetworkProperties());
Map<String, String> props = createMap(properties);
NetworkBridgeConfiguration config = new
NetworkBridgeConfiguration();
IntrospectionSupport.setProperties(config, props, "");
config.setBrokerName(broker.getBrokerName());
URI uri = broker.getVmConnectorURI();
HashMap<String, String> map = new HashMap<String,
String>(URISupport.parseParameters(uri));
map.put("network", "true");
map.put("async", "false");
uri = URISupport.createURIWithQuery(uri,
URISupport.createQueryString(map));
Transport localTransport = TransportFactory.connect(uri);
Transport remoteBridgeTransport = new
ResponseCorrelator(transport);
duplexBridge = NetworkBridgeFactory.createBridge(config,
localTransport, remoteBridgeTransport);
duplexBridge.setBrokerService(broker.getBrokerService());
// now turn duplex off this side
info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true);
duplexBridge.duplexStart(this, brokerInfo, info);
LOG.info("Created Duplex Bridge back to " +
info.getBrokerName());
return null;
} catch (TransportDisposedIOException e) {
LOG.warn("Duplex Bridge back to " + info.getBrokerName() + "
was correctly stopped before it was correctly started.");
return null;
} catch (Exception e) {
LOG.error("Creating duplex network bridge", e);
}
}
// We only expect to get one broker info command per connection
if (this.brokerInfo != null) {
LOG.warn("Unexpected extra broker info command received: " + info);
}
this.brokerInfo = info;
broker.addBroker(this, info);
networkConnection = true;
List<TransportConnectionState> connectionStates =
listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
cs.getContext().setNetworkConnection(true);
}
return null;
}
=> broker.addBroker(this, info)
"info" object seems to become very big in the case of a lot of brokers with a
lot of network connections between them . (That's our case !)
Why is it now necessary (since 5.4) for a "transport connector side" broker to
export already established peer connections to each new "initator side (network
connector)" broker ?
Eric-AWL
> ArraysIndexOutOfBoundsException : -32768 in "BrokerService[xxx] Task" thread
> ----------------------------------------------------------------------------
>
> Key: AMQ-3077
> URL: https://issues.apache.org/jira/browse/AMQ-3077
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.4.0, 5.4.1
> Environment: Linux, Java 6, Fuse Message Brokers 5.4.0-1 or 5.4.1-1
> Reporter: Eric
>
> Exception in thread "BrokerService[SUP-tterdp31v] Task"
> java.lang.ArrayIndexOutOfBoundsException: -32767
> at
> org.apache.activemq.openwire.BooleanStream.writeBoolean(BooleanStream.java:54)
> at
> org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:377)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:131)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalObjectArray1(BaseDataStreamMarshaller.java:357)
> at
> org.apache.activemq.openwire.v6.BrokerInfoMarshaller.tightMarshal1(BrokerInfoMarshaller.java:106)
> at
> org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:397)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:131)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalObjectArray1(BaseDataStreamMarshaller.java:357)
> at
> org.apache.activemq.openwire.v6.BrokerInfoMarshaller.tightMarshal1(BrokerInfoMarshaller.java:106)
> at
> org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:397)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:131)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalObjectArray1(BaseDataStreamMarshaller.java:357)
> at
> org.apache.activemq.openwire.v6.BrokerInfoMarshaller.tightMarshal1(BrokerInfoMarshaller.java:106)
> at
> org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:397)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:131)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalObjectArray1(BaseDataStreamMarshaller.java:357)
> at
> org.apache.activemq.openwire.v6.BrokerInfoMarshaller.tightMarshal1(BrokerInfoMarshaller.java:106)
> at
> org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:397)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:131)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalObjectArray1(BaseDataStreamMarshaller.java:357)
> at
> org.apache.activemq.openwire.v6.BrokerInfoMarshaller.tightMarshal1(BrokerInfoMarshaller.java:106)
> at
> org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:397)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:131)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalObjectArray1(BaseDataStreamMarshaller.java:357)
> at
> org.apache.activemq.openwire.v6.BrokerInfoMarshaller.tightMarshal1(BrokerInfoMarshaller.java:106)
> at
> org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:397)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:131)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalObjectArray1(BaseDataStreamMarshaller.java:357)
> at
> org.apache.activemq.openwire.v6.BrokerInfoMarshaller.tightMarshal1(BrokerInfoMarshaller.java:106)
> at
> org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:397)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:131)
> at
> org.apache.activemq.openwire.v6.BaseDataStreamMarshaller.tightMarshalObjectArray1(BaseDataStreamMarshaller.java:357)
> at
> org.apache.activemq.openwire.v6.BrokerInfoMarshaller.tightMarshal1(BrokerInfoMarshaller.java:106)
> at
> org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:228)
> at
> org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:181)
> at
> org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:255)
> at
> org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
> at
> org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
> at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
> at
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1249)
> at
> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:810)
> at
> org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:846)
> at
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> at
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:619)
> Today this exception is not written in ActiveMQ logs but on the standard
> output.
> It seems that happens when network of brokers connections are stopped, but
> I'm not sure at all. (the number of "bridge to ... stopped" seems to be the
> same as the number of Exceptions detected)
> So I need to dig into this problem to be more precise.
> Eric-AWL
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.