[ 
https://issues.apache.org/jira/browse/QPIDJMS-395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522592#comment-16522592
 ] 

Timothy Bish commented on QPIDJMS-395:
--------------------------------------

Fix for handling connection remote closed and resending pending messages:
https://git1-us-west.apache.org/repos/asf?p=qpid-jms.git;a=commit;h=fe0307b58beb7ec344f728e34a7ca0e3ef103add

{noformat}
QPIDJMS-395 Resend message in flight when remote closed

When the remote closes the connection and an inflight send
is outstanding we should handle the close and resend those
messages that are still awaiting dispositions in the same
manner as we do when the connection unexpectedly drops when
using the Failover feature.
{noformat}

> connection:forced leads to JMSException even though reconnect is enabled
> ------------------------------------------------------------------------
>
>                 Key: QPIDJMS-395
>                 URL: https://issues.apache.org/jira/browse/QPIDJMS-395
>             Project: Qpid JMS
>          Issue Type: Bug
>          Components: qpid-jms-client
>    Affects Versions: 0.33.0
>            Reporter: Jiri Daněk
>            Assignee: Timothy Bish
>            Priority: Major
>             Fix For: 0.34.0
>
>
> Based on 
> http://qpid.2158936.n2.nabble.com/Reconnect-and-amqp-connection-forced-td7659043.html,
>  I believe that reconnect:force error should not be propagated to library 
> user and the library should silently reconnect. This does not happen in the 
> test below, when I am sending messages fast--I do get exception caused by 
> connection:forced. Notice the commented out sleep() call.
> In ActiveMQ Artemis testsuite:
> {noformat}
> diff --git 
> a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java
>  
> b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java
> index 81c28855ef..888171227b 100644
> --- 
> a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java
> +++ 
> b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFailoverEndpointDiscoveryTest.java
> @@ -86,6 +86,32 @@ public class AmqpFailoverEndpointDiscoveryTest extends 
> FailoverTestBase {
>        }
>     }
>  
> +   @Test(timeout = 120000)
> +   public void testReconnectWhileSendingWithAMQP() throws Exception {
> +      JmsConnectionFactory factory = getJmsConnectionFactory();
> +      try (Connection connection = factory.createConnection()) {
> +         connection.start();
> +         Session session = connection.createSession(false, 
> Session.AUTO_ACKNOWLEDGE);
> +         javax.jms.Queue queue = session.createQueue(ADDRESS.toString());
> +         MessageProducer producer = session.createProducer(queue);
> +         Thread t = new Thread(() -> {
> +            try {
> +               while(true) {
> +                  System.out.println("sending message");
> +                  producer.send(session.createTextMessage("hello before 
> failover"));
> +//                  Thread.sleep(1000);  // comment out to send messages 
> quickly
> +               }
> +            } catch (Exception e ) {
> +               e.printStackTrace();
> +            }
> +         });
> +         t.start();
> +         Thread.sleep(2000);  // simpler to read than actual synchronization
> +         liveServer.crash(true, true);
> +         Thread.sleep(2000);
> +      }
> +   }
> +
>     private JmsConnectionFactory getJmsConnectionFactory() {
>        if (protocol == 0) {
>           return new 
> JmsConnectionFactory("failover:(amqp://localhost:61616)");
> {noformat}
> The above will print (only print, there aren't asserts)
> {noformat}
> javax.jms.JMSException: Received error from remote peer without description 
> [condition = amqp:connection:forced]
>       at 
> org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:164)
>       at 
> org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:117)
>       at 
> org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.processRemoteClose(AmqpAbstractResource.java:262)
>       at 
> org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:971)
>       at 
> org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:105)
>       at 
> org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:854)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {noformat}
> The PN_TRACE_FRM log for this is
> {noformat}
> [...]
> [1476077082:1] -> Transfer{handle=0, deliveryId=221, deliveryTag=\x00, 
> messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null, 
> resume=false, aborted=false, batchable=false} (179) 
> "\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x00\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00\x5c\x00\x00\x00\x0a\xa11ID:bc37e33f-5706-4fe3-b953-c5be80e38b52:1:1:1-222@\xa1\x13FailoverTestAddress@@@@@@\x83\x00\x00\x01d\x12f\x06\xac\x00Sw\xa1\x15hello
>  before failover"
> [913923932:1] <- Transfer{handle=0, deliveryId=221, deliveryTag=\x00, 
> messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null, 
> resume=false, aborted=false, batchable=false} (179) 
> "\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x00\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00\x5c\x00\x00\x00\x0a\xa11ID:bc37e33f-5706-4fe3-b953-c5be80e38b52:1:1:1-222@\xa1\x13FailoverTestAddress@@@@@@\x83\x00\x00\x01d\x12f\x06\xac\x00Sw\xa1\x15hello
>  before failover"
> [913923932:1] -> Disposition{role=RECEIVER, first=221, last=221, 
> settled=true, state=Accepted{}, batchable=false}
> [1476077082:1] <- Disposition{role=RECEIVER, first=221, last=221, 
> settled=true, state=Accepted{}, batchable=false}
> sending message
> [1476077082:1] -> Transfer{handle=0, deliveryId=222, deliveryTag=\x00, 
> messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null, 
> resume=false, aborted=false, batchable=false} (179) 
> "\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x00\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00\x5c\x00\x00\x00\x0a\xa11ID:bc37e33f-5706-4fe3-b953-c5be80e38b52:1:1:1-223@\xa1\x13FailoverTestAddress@@@@@@\x83\x00\x00\x01d\x12f\x06\xb0\x00Sw\xa1\x15hello
>  before failover"
> [913923932:0] -> Close{error=Error{condition=amqp:connection:forced, 
> description='null', info=null}}
> [913923932:1] <- Transfer{handle=0, deliveryId=222, deliveryTag=\x00, 
> messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null, 
> resume=false, aborted=false, batchable=false} (179) 
> "\x00Sp\xc0\x02\x01A\x00Sr\xc1)\x04\xa3\x0ex-opt-jms-destQ\x00\xa3\x12x-opt-jms-msg-typeQ\x05\x00Ss\xd0\x00\x00\x00\x5c\x00\x00\x00\x0a\xa11ID:bc37e33f-5706-4fe3-b953-c5be80e38b52:1:1:1-223@\xa1\x13FailoverTestAddress@@@@@@\x83\x00\x00\x01d\x12f\x06\xb0\x00Sw\xa1\x15hello
>  before failover"
> [1476077082:0] <- Close{error=Error{condition=amqp:connection:forced, 
> description='null', info=null}}
> [1476077082:0] -> Close{error=null}
> [913923932:0] <- Close{error=null}
> javax.jms.JMSException: Received error from remote peer without description 
> [condition = amqp:connection:forced]
>       at 
> org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:164)
>       at 
> org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:117)
>       at 
> org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.processRemoteClose(AmqpAbstractResource.java:262)
>       at 
> org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:971)
>       at 
> org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:105)
>       at 
> org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:854)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> [Thread-8 
> (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$5@343570b7)]
>  12:17:13,704 WARN  [org.apache.activemq.artemis.core.client] AMQ212037: 
> Connection failure has been detected: AMQ119015: The connection was 
> disconnected because of server shutdown [code=DISCONNECTED]
> [Thread-6 (ActiveMQ-client-global-threads)] 12:17:13,704 WARN  
> [org.apache.activemq.artemis.core.client] AMQ212037: Connection failure has 
> been detected: AMQ119015: The connection was disconnected because of server 
> shutdown [code=DISCONNECTED]
> [Thread-2] 12:17:13,731 INFO  [org.apache.activemq.artemis.core.server] 
> AMQ221002: Apache ActiveMQ Artemis Message Broker version 
> identity=AmqpFailoverEndpointDiscoveryTest/liveServer,version=2.7.0-SNAPSHOT 
> [c1f0e25e-72e0-11e8-acbd-b6cd2f5fd8dd] stopped, uptime 4.323 seconds
> [Thread-6 (ActiveMQ-client-global-threads)] 12:17:13,737 WARN  
> [org.apache.activemq.artemis.core.client] AMQ212004: Failed to connect to 
> server.
> [AMQ119000: Activation for server 
> ActiveMQServerImpl::AmqpFailoverEndpointDiscoveryTest/backupServers] 
> 12:17:13,831 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: 
> Started EPOLL Acceptor at localhost:61617 for protocols 
> [CORE,MQTT,AMQP,STOMP,HORNETQ,OPENWIRE]
> [AMQ119000: Activation for server 
> ActiveMQServerImpl::AmqpFailoverEndpointDiscoveryTest/backupServers] 
> 12:17:13,831 INFO  [org.apache.activemq.artemis.core.server] AMQ221010: 
> Backup Server is now live
> [737743497:0] -> Open{ 
> containerId='ID:b8e3f0c3-0810-41d0-a087-c69de89f017f:1', 
> hostname='localhost', maxFrameSize=1048576, channelMax=32767, 
> idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, 
> offeredCapabilities=null, desiredCapabilities=[sole-connection-for-container, 
> DELAYED_DELIVERY, ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, 
> version=0.32.0, platform=JVM: 1.8.0_172, 25.172-b11, Oracle Corporation, OS: 
> Linux, 4.14.43, amd64}}
> [1583552999:0] <- Open{ 
> containerId='ID:b8e3f0c3-0810-41d0-a087-c69de89f017f:1', 
> hostname='localhost', maxFrameSize=1048576, channelMax=32767, 
> idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, 
> offeredCapabilities=null, desiredCapabilities=[sole-connection-for-container, 
> DELAYED_DELIVERY, ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, 
> version=0.32.0, platform=JVM: 1.8.0_172, 25.172-b11, Oracle Corporation, OS: 
> Linux, 4.14.43, amd64}}
> [1583552999:0] -> Open{ containerId='localhost', hostname='null', 
> maxFrameSize=131072, channelMax=65535, idleTimeOut=30000, 
> outgoingLocales=null, incomingLocales=null, 
> offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, 
> SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null, 
> properties={product=apache-activemq-artemis, version=2.7.0-SNAPSHOT}}
> [737743497:0] <- Open{ containerId='localhost', hostname='null', 
> maxFrameSize=131072, channelMax=65535, idleTimeOut=30000, 
> outgoingLocales=null, incomingLocales=null, 
> offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, 
> SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null, 
> properties={product=apache-activemq-artemis, version=2.7.0-SNAPSHOT}}
> [737743497:0] -> Begin{remoteChannel=null, nextOutgoingId=1, 
> incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535, 
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [1583552999:0] <- Begin{remoteChannel=null, nextOutgoingId=1, 
> incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535, 
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [1583552999:0] -> Begin{remoteChannel=0, nextOutgoingId=1, 
> incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535, 
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [737743497:0] <- Begin{remoteChannel=0, nextOutgoingId=1, 
> incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535, 
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [737743497:1] -> Begin{remoteChannel=null, nextOutgoingId=1, 
> incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535, 
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [1583552999:1] <- Begin{remoteChannel=null, nextOutgoingId=1, 
> incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535, 
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [1583552999:1] -> Begin{remoteChannel=1, nextOutgoingId=1, 
> incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535, 
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [737743497:1] <- Begin{remoteChannel=1, nextOutgoingId=1, 
> incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535, 
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [737743497:1] -> 
> Attach{name='qpid-jms:sender:ID:bc37e33f-5706-4fe3-b953-c5be80e38b52:1:1:1:FailoverTestAddress',
>  handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, 
> source=Source{address='ID:bc37e33f-5706-4fe3-b953-c5be80e38b52:1:1:1', 
> durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, 
> dynamicNodeProperties=null, distributionMode=null, filter=null, 
> defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list, 
> amqp:released:list, amqp:modified:list], capabilities=null}, 
> target=Target{address='FailoverTestAddress', durable=NONE, 
> expiryPolicy=SESSION_END, timeout=0, dynamic=false, 
> dynamicNodeProperties=null, capabilities=[queue]}, unsettled=null, 
> incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, 
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [...]
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to