[
https://issues.apache.org/jira/browse/AMQ-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13146959#comment-13146959
]
Giovanni Toffetti commented on AMQ-1509:
----------------------------------------
Hi Dejan,
just by chance I started looking again at this issue. I don't think the problem
is fixed: as soon as there are more than one ( at least 2 hops ) paths between
brokers message duplication occurs.
Here's a little example:
{code:title=FourBrokerTopicNetworkTest}
public class FourBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport
implements MessageListener {
protected static final int MESSAGE_COUNT = 5;
public boolean dynamicOnly;
public void initCombosForTestABandBCbrokerNetworkWithSelectors() {
addCombinationValues("dynamicOnly", new Object[] { true, false
});
}
/**
* A simple square topology BrokerA <-> BrokerB BrokerA <-> BrokerC
BrokerB
* <-> BrokerD BrokerD <-> BrokerC
*
*/
public void testSquareConnectedBrokerNetwork2() throws Exception {
int networkTTL = 2;
boolean conduitSubs = true;
boolean dynamicOnly = false;
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL,
conduitSubs);
bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL,
conduitSubs);
bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL,
conduitSubs);
bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL,
conduitSubs);
bridgeBrokers("BrokerD", "BrokerC", dynamicOnly, networkTTL,
conduitSubs);
bridgeBrokers("BrokerC", "BrokerD", dynamicOnly, networkTTL,
conduitSubs);
bridgeBrokers("BrokerD", "BrokerB", dynamicOnly, networkTTL,
conduitSubs);
bridgeBrokers("BrokerB", "BrokerD", dynamicOnly, networkTTL,
conduitSubs);
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", true);
// Setup consumers
MessageConsumer clientA = createConsumer("BrokerA", dest,
"msgId > 0");
MessageConsumer clientB = createConsumer("BrokerB", dest,
"msgId > 0");
MessageConsumer clientC = createConsumer("BrokerC", dest,
"msgId > 0");
MessageConsumer clientD = createConsumer("BrokerD", dest,
"msgId > 0");
// let consumers propogate around the network
Thread.sleep(5000);
clientD.setMessageListener(this);
// Send messages
String[] brokers = { "BrokerA", "BrokerB", "BrokerC", "BrokerD"
};
HashMap<String, Object> props = new HashMap<String, Object>();
for (String broker : brokers) {
props.put("sender", broker);
for (int i = 1; i <= MESSAGE_COUNT; i++) {
props.put("msgId", i);
sendMessages(broker, dest, 1, props);
}
}
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
MessageIdList msgsD = getConsumerMessages("BrokerD", clientD);
msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 4);
msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 4);
msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 4);
msgsD.waitForMessagesToArrive(MESSAGE_COUNT * 4);
System.out.println(msgsA.toString());
assertEquals(MESSAGE_COUNT * 4, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT * 4, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT * 4, msgsC.getMessageCount());
assertEquals(MESSAGE_COUNT * 4, msgsD.getMessageCount());
}
/**
* A simple square topology BrokerA <-> BrokerB BrokerA <-> BrokerC
BrokerB
* <-> BrokerD BrokerD <-> BrokerC
*
*/
public void testSquareConnectedBrokerNetwork() throws Exception {
int networkTTL = 2;
boolean conduitSubs = true;
boolean dynamicOnly = false;
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL,
conduitSubs);
bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL,
conduitSubs);
bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL,
conduitSubs);
bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL,
conduitSubs);
bridgeBrokers("BrokerD", "BrokerC", dynamicOnly, networkTTL,
conduitSubs);
bridgeBrokers("BrokerC", "BrokerD", dynamicOnly, networkTTL,
conduitSubs);
bridgeBrokers("BrokerD", "BrokerB", dynamicOnly, networkTTL,
conduitSubs);
bridgeBrokers("BrokerB", "BrokerD", dynamicOnly, networkTTL,
conduitSubs);
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", true);
// Setup consumers
MessageConsumer clientA = createConsumer("BrokerA", dest);
MessageConsumer clientB = createConsumer("BrokerB", dest);
MessageConsumer clientC = createConsumer("BrokerC", dest);
MessageConsumer clientD = createConsumer("BrokerD", dest);
// let consumers propogate around the network
Thread.sleep(5000);
// Send messages
sendMessages("BrokerA", dest, MESSAGE_COUNT);
sendMessages("BrokerB", dest, MESSAGE_COUNT);
sendMessages("BrokerC", dest, MESSAGE_COUNT);
sendMessages("BrokerD", dest, MESSAGE_COUNT);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
MessageIdList msgsD = getConsumerMessages("BrokerD", clientD);
msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 4);
msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 4);
msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 4);
msgsD.waitForMessagesToArrive(MESSAGE_COUNT * 4);
assertEquals(MESSAGE_COUNT * 4, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT * 4, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT * 4, msgsC.getMessageCount());
assertEquals(MESSAGE_COUNT * 4, msgsD.getMessageCount());
}
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
String options = new String("?persistent=false&useJmx=false");
createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" +
options));
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" +
options));
createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" +
options));
createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD" +
options));
}
public static Test suite() {
return suite(FourBrokerTopicNetworkTest.class);
}
@Override
public void onMessage(Message message) {
try {
System.err.println(message.getStringProperty("sender")
+ " msgID:" + message.getIntProperty("msgId") );
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
{code}
I don't know if there's anything wrong with this test, or if I should use
different configurations of TTL, conduit, and dynamicOnly. I tested it with the
latest AMQ I could get (5.5.1).
As you can see delivered messages are more than 20, they are 25. The reason
behind it can be seen in the testSquareConnectedBrokerNetwork2 method: clientD
will print all messages coming from BrokerA twice as they are forwarded by both
BrokerB and BrokerC on two different paths.
And of course this is a major problem whenever a broker network has multiple
paths as message duplication becomes so severe that it basically kills the
whole thing.
Please let me know if the test is correct as I'd like to have some more insight
about why this is happening. Also my colleagues and I have some ideas about the
correct way to fix it.
Regards,
g
> Duplicate topic messages received with network of brokers and selectors
> -----------------------------------------------------------------------
>
> Key: AMQ-1509
> URL: https://issues.apache.org/jira/browse/AMQ-1509
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker, Transport
> Affects Versions: 4.1.1
> Reporter: Howard Orner
> Assignee: Rob Davies
> Fix For: 5.3.0
>
> Attachments: ActiveMQActor.java
>
>
> If you create a network of two brokers, A and B, one publisher publishing to
> A, and n (where n is > 1) receivers with selectors, each receiver recieves n
> messages for every 1 message sent. The key here is to have a selector. It
> would appear that the conduitSubscriptions flag does not work when using
> selectors. The conduit does not properly reconcile consumers if they have
> selectors. A suggested soltuion would be that ather than process each
> selector independantly, each selector should be or'ed together and if any
> selector results in true then a single message should be sent to the other
> broker.
> In doing research, it would appear that this problem was introduced with bug
> fix AMQ-810. Another user reported it via email back to the assignee of
> AMQ-810 and a short dialog transpired. See
> http://www.mail-archive.com/[email protected]/msg05198.html.
>
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira