Hello Bruce,

thanks for sharing precise info
That sounds really very strange and sounds like a bug of AMQ (or a
configuration issue).
A question, maybe trivial: which version of AMQ jar library are you using
on your Java producer?


*Distinti Saluti / *Kind Regards
M.G.



Il giorno lun 26 feb 2024 alle ore 05:08 Bruce Cooper <
br...@mechination.com.au> ha scritto:

> Now I have to rescind my previous comments.  It runs out NON_PERSISTENT
> messages from clients or the browser console are forwarded and consumed
> correctly, but PERSISTENT messages are not transmitted correctly.  This is
> true whether I send from the console, from JAVA using the JMS/OpenWire
> client or from Javascript rhea/AMQP.  I have turned on TRACE logging on the
> servers, but do not see any hints.
>
> Any suggestions?  I saw that Herbert suggested I had only set up two
> parallel brokers, but I followed the instructions at
>
> https://urldefense.com/v3/__https://activemq.apache.org/components/classic/documentation/shared-file-system-master-slave__;!!Ck4v2Rc!kCJ7o6_l-zIBD-UBlJOFj6X_R52z-t95TS3L_Cd175LwuDgk0_1ZAOfTLPv7spvhXcll_c6yLN8WY1oHFIZc52E$
>
>
>
>
> On Mon, 26 Feb 2024 at 10:00, Bruce Cooper <br...@mechination.com.au>
> wrote:
>
> > As a follow up, I can now confirm that using a different client (an AMQP
> > client written in javascript using the rhea library) works as expected.
> It
> > is just this Java process that is broken.  Any assistance you can provide
> > would be greatly appreciated.
> >
> > On Mon, 26 Feb 2024 at 09:21, Bruce Cooper <br...@mechination.com.au>
> > wrote:
> >
> >> Hi Marco,
> >>
> >> Thanks for your response.  I agree it is strange.  I now have some more
> >> information to share.  It now looks like it has something to do with my
> >> message producer, rather than core ActiveMQ behaviour.
> >>
> >> If I send messages using the ActiveMQ console on Broker A, they are
> >> transmitted and received correctly by the receiver.  If I send messages
> >> using the attached program, it only works until the failover happens.
> What
> >> is super strange is that the producer isn't connecting to the failover
> pair
> >> (B and C).  It is connected to the satellite (A).  Furthermore, that
> >> process is ephemeral.  It is restarted each time a send is made.
> >>
> >> My test is now a bit different than before. I first setup the brokers,
> >> and initiate a "failover"
> >>
> >>    1. Start Brokers B and C configured to use the same data directory so
> >>    that they are in a failover configuration.  Note that B is acting as
> the
> >>    primary, and C is in standby
> >>    2. Start Broker A, configured to point to Brokers B and C via the
> >>    connection string static:failover:(tcp://100.127.41.128:61616,tcp://
> >>    100.127.41.128:61617). Note that it connects to B
> >>    3. Start consumer to consume from Brokers B and C using the same
> >>    failover connection string. Note that it connects to B (in this
> example)
> >>    4. Start producer to send to broker A.  Note that messages are
> >>    received by the consumer.  Repeat to confirm that this continues to
> operate.
> >>    5. Send a message using the console, and note that this also works.
> >>    6. Stop Broker B.  Wait for both Broker A and the consumer to
> >>    reconnect to C, once it has obtained its locks and started.
> >>
> >> Once failover has occurred, proving that sending from the console still
> >> works:
> >>
> >>    1. Connect all processes together, do a failover, and wait for
> >>    reconnection
> >>    2. Send message in Broker A's Console, on the queue page
> >>    3. Message is received by consumer
> >>    4. See Enqueued and Dequeued metric go up (after a browser refresh) -
> >>    This indicates that the message has been dequeued by the network
> connection
> >>    and forwarded on to the HA pair.
> >>    5. Check the console on Broker C and note that its enqueue and
> >>    dequeue count have incremented.
> >>
> >> The above shows everything works as expected, now let us try my producer
> >> program
> >>
> >>    1. Run the main program below with argument "send" - This connects to
> >>    the satellite broker (Broker A) and publishes a message
> >>    2. Note that no message is received by the consumer connected to C
> >>    (once the failover has occurred)
> >>    3. Note that Broker A's console queue page again, and see that the
> >>    Enqueued and Dequeued metric have incremented as before.
> >>    4. Check the broker C queue console page, and note that this time the
> >>    Enqueued and Dequeued metric have _not_ been incremented.  It is as
> if A
> >>    thinks it has sent the message, but the pair C never receives it.
> >>
> >> I feel like I must have misconfigured my producer code in some fashion,
> >> but it was simply based on examples I found.  Is there something
> obvious I
> >> have missed?
> >>
> >>
> >> package au.com.mechination.integ.test;
> >>
> >> import java.util.Date;
> >>
> >> import javax.jms.Connection;
> >> import javax.jms.DeliveryMode;
> >> import javax.jms.Destination;
> >> import javax.jms.JMSException;
> >> import javax.jms.Message;
> >> import javax.jms.MessageConsumer;
> >> import javax.jms.MessageProducer;
> >> import javax.jms.Session;
> >> import javax.jms.TextMessage;
> >>
> >> import org.apache.activemq.ActiveMQConnectionFactory;
> >> import org.slf4j.Logger;
> >> import org.slf4j.LoggerFactory;
> >>
> >> /**
> >> * Hello world!
> >> *
> >> */
> >> public class Main {
> >>     Connection conn = null;
> >>     ActiveMQConnectionFactory connectionFactory;
> >>     private Session session;
> >>
> >>     static final Logger logger = LoggerFactory.getLogger(Main.class);
> >>
> >>     public static void main(String[] args) throws JMSException {
> >>         Main doer = new Main();
> >>
> >>         if (args.length > 0 && args[0].equals("send")) {
> >>             doer.send();
> >>         } else {
> >>             doer.receive();
> >>         }
> >>     }
> >>
> >>     public void connect(String url, String username, String password)
> >> throws JMSException {
> >>         final ActiveMQConnectionFactory connectionFactory = new
> >> ActiveMQConnectionFactory(url);
> >>
> >>         // Pass the sign-in credentials.
> >>         connectionFactory.setUserName(username);
> >>         connectionFactory.setPassword(password);
> >>
> >>         // Establish a connection for the producer.
> >>         this.conn = connectionFactory.createConnection();
> >>         conn.start();
> >>
> >>         session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
> >>     }
> >>
> >>     void send() throws JMSException {
> >>         String url = "tcp://localhost:61616";
> >>         String username = "bruce";
> >>         String password = "sekrit";
> >>
> >>         System.out.println("Sender connecting to " + url);
> >>         connect(url, username, password);
> >>
> >>         // Create a queue named "MyQueue".
> >>         final Destination producerDestination = session.createQueue(
> >> "TestQueue");
> >>
> >>         // Create a producer from the session to the queue.
> >>         final MessageProducer producer = session.createProducer
> >> (producerDestination);
> >>         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
> >>         System.out.println("TTL is " + producer.getTimeToLive()); //
> >> Prints 0 - Unlimited
> >>
> >>         for (int i = 0; i < 1; i++) {
> >>             String text = "Hello from Amazon MQ!: " + i + " " + new Date
> >> ().toInstant().toString();
> >>             System.out.println("Sending message " + text);
> >>             TextMessage producerMessage = session.createTextMessage
> >> (text);
> >>             producer.send(producerMessage);
> >>         }
> >>
> >>         producer.close();
> >>
> >>         System.out.println("Send completed");
> >>         System.exit(0);
> >>     }
> >>
> >>     void receive() throws JMSException {
> >>         String url = "failover:(tcp://100.127.41.128:61616,tcp://
> >> 100.127.41.128:61617)";
> >>         String username = "bruce";
> >>         String password = "sekrit";
> >>
> >>         System.out.println("Receiver connecting to " + url);
> >>         connect(url, username, password);
> >>
> >>         final Destination consumerQueue =
> session.createQueue("TestQueue"
> >> );
> >>         final MessageConsumer consumer = session.createConsumer
> >> (consumerQueue);
> >>
> >>         while (true) {
> >>             final Message msg = consumer.receive(2500);
> >>             if (msg == null) {
> >>                 System.out.print(".");
> >>             } else if (msg instanceof TextMessage) {
> >>                 System.out.println("Received msg " + ((TextMessage)
> msg).
> >> getText());
> >>             } else {
> >>                 System.err.println("Something unexpected received");
> >>             }
> >>         }
> >>     }
> >> }
> >>
> >>
> >>
> >>
> >>
> >
> > --
> >
> https://urldefense.com/v3/__https://mechination.com.au/__;!!Ck4v2Rc!kCJ7o6_l-zIBD-UBlJOFj6X_R52z-t95TS3L_Cd175LwuDgk0_1ZAOfTLPv7spvhXcll_c6yLN8WY1oHayvR7KQ$
> > Ph: 0448 341 729
> >
>
>
> --
>
> https://urldefense.com/v3/__https://mechination.com.au/__;!!Ck4v2Rc!kCJ7o6_l-zIBD-UBlJOFj6X_R52z-t95TS3L_Cd175LwuDgk0_1ZAOfTLPv7spvhXcll_c6yLN8WY1oHayvR7KQ$
> Ph: 0448 341 729
>

Reply via email to