ARTEMIS-1134 Close connection if error caught during event processing If an error escapes into the event processing layer we close the connection with an error condition to avoid the client becoming stuck on waiting for a response from the broker and the broker side being in an unknown state.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/970782d3 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/970782d3 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/970782d3 Branch: refs/heads/master Commit: 970782d36af7a3ba5cc52fb73fe5718fc0f34021 Parents: 6a251ee Author: Timothy Bish <[email protected]> Authored: Fri Apr 28 16:21:51 2017 -0400 Committer: Clebert Suconic <[email protected]> Committed: Tue May 2 13:01:29 2017 -0400 ---------------------------------------------------------------------- .../protocol/amqp/proton/handler/ProtonHandler.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/970782d3/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index e3cb730..eb95dec 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -25,8 +25,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL; @@ -34,6 +32,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; @@ -43,6 +42,9 @@ import org.apache.qpid.proton.engine.Sasl; import org.apache.qpid.proton.engine.Transport; import org.jboss.logging.Logger; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; + public class ProtonHandler extends ProtonInitializable { private static final Logger log = Logger.getLogger(ProtonHandler.class); @@ -89,7 +91,6 @@ public class ProtonHandler extends ProtonInitializable { connection.collect(collector); } - public long tick(boolean firstTick) { lock.lock(); try { @@ -141,7 +142,6 @@ public class ProtonHandler extends ProtonInitializable { Thread.currentThread().interrupt(); return false; } - } public Transport getTransport() { @@ -168,7 +168,6 @@ public class ProtonHandler extends ProtonInitializable { } this.serverSasl.server(); serverSasl.setMechanisms(names); - } public void flushBytes() { @@ -348,7 +347,12 @@ public class ProtonHandler extends ProtonInitializable { Events.dispatch(ev, h); } catch (Exception e) { log.warn(e.getMessage(), e); - connection.setCondition(new ErrorCondition()); + ErrorCondition error = new ErrorCondition(); + error.setCondition(AmqpError.INTERNAL_ERROR); + error.setDescription("Unrecoverable error: " + + (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage())); + connection.setCondition(error); + connection.close(); } }
