producer refactor
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/95f76e2e Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/95f76e2e Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/95f76e2e Branch: refs/heads/refactor-openwire Commit: 95f76e2ec0fe4b30d0f43a88d7aa9ea1739d0073 Parents: 31bc86f Author: Clebert Suconic <[email protected]> Authored: Tue Mar 1 22:31:39 2016 -0500 Committer: Clebert Suconic <[email protected]> Committed: Thu Mar 17 14:10:46 2016 -0400 ---------------------------------------------------------------------- .../remoting/impl/netty/NettyConnection.java | 4 + .../artemis/spi/core/remoting/Connection.java | 6 + .../protocol/openwire/OpenWireConnection.java | 163 ++++------- .../openwire/OpenWireProtocolManager.java | 15 +- .../core/protocol/openwire/SendingResult.java | 57 ---- .../core/protocol/openwire/amq/AMQSession.java | 281 +++++++++---------- .../openwire/impl/OpenWireServerCallback.java | 75 ----- .../artemis/core/paging/PagingStore.java | 2 + .../core/remoting/impl/invm/InVMConnection.java | 6 + .../FailoverConsumerOutstandingCommitTest.java | 25 +- .../InvestigationOpenwireTest.java | 27 ++ .../storage/PersistMultiThreadTest.java | 4 + 12 files changed, 255 insertions(+), 410 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95f76e2e/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 9268699..3f10227 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -100,6 +100,10 @@ public class NettyConnection implements Connection { } // Connection implementation ---------------------------- + @Override + public void setAutoRead(boolean autoRead) { + channel.config().setAutoRead(autoRead); + } @Override public synchronized boolean isWritable(ReadyListener callback) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95f76e2e/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java index ed10113..4352d49 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java @@ -44,6 +44,12 @@ public interface Connection { void fireReady(boolean ready); /** + * This will disable reading from the channel. + * This is basically the same as blocking the reading. + * */ + void setAutoRead(boolean autoRead); + + /** * returns the unique id of this wire. * * @return the id http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95f76e2e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 0fd8dc2..1e1e953 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.protocol.openwire; import javax.jms.InvalidClientIDException; import javax.jms.InvalidDestinationException; import javax.jms.JMSSecurityException; -import javax.jms.ResourceAllocationException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -270,22 +269,26 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se catch (Exception e) { ActiveMQServerLogger.LOGGER.debug(e); - Response resp; - if (e instanceof ActiveMQSecurityException) { - resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); - } - else if (e instanceof ActiveMQNonExistentQueueException) { - resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage())); - } - else { - resp = new ExceptionResponse(e); - } - try { - dispatch(resp); - } - catch (IOException e2) { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2); - } + sendException(e); + } + } + + public void sendException(Exception e) { + Response resp; + if (e instanceof ActiveMQSecurityException) { + resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); + } + else if (e instanceof ActiveMQNonExistentQueueException) { + resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage())); + } + else { + resp = new ExceptionResponse(e); + } + try { + dispatch(resp); + } + catch (IOException e2) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2); } } @@ -371,75 +374,18 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } - public void dispatchAsync(Command message) { - if (!stopping.get()) { - dispatchSync(message); - } - else { - if (message.isMessageDispatch()) { - MessageDispatch md = (MessageDispatch) message; - TransmitCallback sub = md.getTransmitCallback(); - protocolManager.postProcessDispatch(md); - if (sub != null) { - sub.onFailure(); - } - } - } - } - - public void dispatchSync(Command message) { - try { - processDispatch(message); - } - catch (IOException e) { - serviceExceptionAsync(e); - } - } - - public void serviceExceptionAsync(final IOException e) { - if (asyncException.compareAndSet(false, true)) { - // TODO: Why this is not through an executor? - new Thread("Async Exception Handler") { - @Override - public void run() { - serviceException(e); - } - }.start(); - } + public void dispatchAsync(Command message) throws Exception { + dispatchSync(message); } - public void serviceException(Throwable e) { - // are we a transport exception such as not being able to dispatch - // synchronously to a transport - if (e instanceof IOException) { - serviceTransportException((IOException) e); - } - else if (!stopping.get() && !inServiceException) { - inServiceException = true; - try { - ConnectionError ce = new ConnectionError(); - ce.setException(e); - dispatchAsync(ce); - } - finally { - inServiceException = false; - } - } + public void dispatchSync(Command message) throws Exception { + processDispatch(message); } - public void serviceTransportException(IOException e) { - /* - * deal with it later BrokerService bService = - * connector.getBrokerService(); if (bService.isShutdownOnSlaveFailure()) - * { if (brokerInfo != null) { if (brokerInfo.isSlaveBroker()) { - * LOG.error("Slave has exception: {} shutting down master now.", - * e.getMessage(), e); try { doStop(); bService.stop(); } catch (Exception - * ex) { LOG.warn("Failed to stop the master", ex); } } } } if - * (!stopping.get() && !pendingStop) { transportException.set(e); if - * (TRANSPORTLOG.isDebugEnabled()) { TRANSPORTLOG.debug(this + " failed: " - * + e, e); } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) { - * TRANSPORTLOG.warn(this + " failed: " + e); } stopAsync(); } - */ + public void serviceException(Throwable e) throws Exception { + ConnectionError ce = new ConnectionError(); + ce.setException(e); + dispatchAsync(ce); } protected void dispatch(Command command) throws IOException { @@ -570,7 +516,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } - private void disconnect(ActiveMQException me, String reason, boolean fail) { + private void disconnect(ActiveMQException me, String reason, boolean fail) { if (context == null || destroyed) { return; @@ -596,8 +542,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se if (command != null && command.isResponseRequired()) { Response lastResponse = new Response(); lastResponse.setCorrelationId(command.getCommandId()); - dispatchSync(lastResponse); - context.setDontSendReponse(true); + try { + dispatchSync(lastResponse); + } + catch (Throwable e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + } } } @@ -632,12 +582,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return this.context; } - public void updateClient(ConnectionControl control) { - // if (!destroyed && context.isFaultTolerant()) { + public void updateClient(ConnectionControl control) throws Exception { if (protocolManager.isUpdateClusterClients()) { dispatchAsync(control); } - // } } public AMQConnectionContext initContext(ConnectionInfo info) { @@ -1063,9 +1011,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { - protocolManager.commitTransactionOnePhase(info); - TransactionId txId = info.getTransactionId(); - txMap.remove(txId); + new Exception("commit").printStackTrace(); + try { + protocolManager.commitTransactionOnePhase(info); + TransactionId txId = info.getTransactionId(); + txMap.remove(txId); + } + catch (Exception e) { + e.printStackTrace(); + throw e; + } return null; } @@ -1150,33 +1105,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se AMQSession session = getSession(producerId.getParentId()); - SendingResult result = session.send(producerExchange, messageSend, sendProducerAck); - if (result.isBlockNextSend()) { - if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) { - // TODO see logging - throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); - } - - if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) { - //in that case don't send the response - //this will force the client to wait until - //the response is got. - context.setDontSendReponse(true); - } - else { - //hang the connection until the space is available - session.blockingWaitForSpace(producerExchange, result); - } - } - else if (sendProducerAck) { - // TODO-now: send through OperationContext - ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); - OpenWireConnection.this.dispatchAsync(ack); - } - + session.send(producerInfo, messageSend, sendProducerAck); return null; } + @Override public Response processMessageAck(MessageAck ack) throws Exception { AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95f76e2e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 51c4bec..7445960 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -40,6 +40,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerE import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; @@ -192,7 +193,13 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl for (OpenWireConnection c : this.connections) { ConnectionControl control = newConnectionControl(); - c.updateClient(control); + try { + c.updateClient(control); + } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + c.sendException(e); + } } } @@ -365,7 +372,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl context.setProducerFlowControl(false); AMQSession sess = context.getConnection().getAdvisorySession(); if (sess != null) { - sess.send(producerExchange, advisoryMessage, false); + sess.send(producerExchange.getProducerState().getInfo(), advisoryMessage, false); } } finally { @@ -515,7 +522,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl server.destroyQueue(subQueueName); } - public void sendBrokerInfo(OpenWireConnection connection) { + public void sendBrokerInfo(OpenWireConnection connection) throws Exception { BrokerInfo brokerInfo = new BrokerInfo(); brokerInfo.setBrokerName(server.getIdentity()); brokerInfo.setBrokerId(new BrokerId("" + server.getNodeID())); @@ -525,7 +532,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl //cluster support yet to support brokerInfo.setPeerBrokerInfos(null); - connection.dispatchAsync(brokerInfo); + connection.dispatch(brokerInfo); } public void setRebalanceClusterClients(boolean rebalance) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95f76e2e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java deleted file mode 100644 index 0e21ca4..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.openwire; - -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl; -import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; - -public class SendingResult { - - private boolean blockNextSend; - private PagingStoreImpl blockPagingStore; - private SimpleString blockingAddress; - - public void setBlockNextSend(boolean block) { - this.blockNextSend = block; - } - - public boolean isBlockNextSend() { - return this.blockNextSend; - } - - public void setBlockPagingStore(PagingStoreImpl store) { - this.blockPagingStore = store; - } - - public PagingStoreImpl getBlockPagingStore() { - return this.blockPagingStore; - } - - public void setBlockingAddress(SimpleString address) { - this.blockingAddress = address; - } - - public SimpleString getBlockingAddress() { - return this.blockingAddress; - } - - public boolean isSendFailIfNoSpace() { - AddressFullMessagePolicy policy = this.blockPagingStore.getAddressFullMessagePolicy(); - return policy == AddressFullMessagePolicy.FAIL; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95f76e2e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 4db5967..b68861e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -16,8 +16,8 @@ */ package org.apache.activemq.artemis.core.protocol.openwire.amq; +import javax.jms.ResourceAllocationException; import javax.transaction.xa.Xid; -import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -26,41 +26,38 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.transaction.impl.XidImpl; +import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; +import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.ProducerAck; -import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.XATransactionId; -import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl; -import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; -import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; -import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; -import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil; -import org.apache.activemq.artemis.core.protocol.openwire.SendingResult; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; -import org.apache.activemq.artemis.core.transaction.impl.XidImpl; -import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.wireformat.WireFormat; @@ -106,6 +103,10 @@ public class AMQSession implements SessionCallback { this.converter = new OpenWireMessageConverter(marshaller.copy()); } + public OpenWireMessageConverter getConverter() { + return converter; + } + public void initialize() { String name = sessInfo.getSessionId().toString(); String username = connInfo.getUserName(); @@ -226,25 +227,10 @@ public class AMQSession implements SessionCallback { } - public AMQServerSession getCoreSession() { - return this.coreSession; - } - - public ActiveMQServer getCoreServer() { - return this.server; - } - - public void removeConsumer(long consumerId) throws Exception { - boolean failed = !(this.txId != null || this.isTx); - - coreSession.amqCloseConsumer(consumerId, failed); - consumers.remove(consumerId); - } - public SendingResult send(AMQProducerBrokerExchange producerExchange, - Message messageSend, - boolean sendProducerAck) throws Exception { - SendingResult result = new SendingResult(); + public void send(final ProducerInfo producerInfo, + final Message messageSend, + boolean sendProducerAck) throws Exception { TransactionId tid = messageSend.getTransactionId(); if (tid != null) { resetSessionTx(tid); @@ -262,39 +248,128 @@ public class AMQSession implements SessionCallback { actualDestinations = new ActiveMQDestination[]{destination}; } - for (ActiveMQDestination dest : actualDestinations) { + ServerMessage originalCoreMsg = getConverter().inbound(messageSend); + + /* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did + * not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to + * the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the + * message comes from failover connection. If so we add a DUPLICATE_ID to handle duplicates after a resend. */ + if (connection.getContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) { + originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString()); + } + + Runnable runnable; + + if (sendProducerAck) { + runnable = new Runnable() { + public void run() { + try { + ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); + connection.dispatchSync(ack); + } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + connection.sendException(e); + } + + } + }; + } + else { + final Connection transportConnection = connection.getTransportConnection(); + +// new Exception("Setting to false").printStackTrace(); + + if (transportConnection == null) { + // I don't think this could happen, but just in case, avoiding races + runnable = null; + } + else { + runnable = new Runnable() { + public void run() { + transportConnection.setAutoRead(true); + } + }; + } + } + - ServerMessageImpl coreMsg = (ServerMessageImpl)converter.inbound(messageSend); + internalSend(actualDestinations, originalCoreMsg, runnable); + } + + private void internalSend(ActiveMQDestination[] actualDestinations, + ServerMessage originalCoreMsg, + final Runnable onComplete) throws Exception { + + Runnable runToUse; + + if (actualDestinations.length <= 1 || onComplete == null) { + // if onComplete is null, this will be null ;) + runToUse = onComplete; + } + else { + final AtomicInteger count = new AtomicInteger(actualDestinations.length); + runToUse = new Runnable() { + @Override + public void run() { + if (count.decrementAndGet() == 0) { + onComplete.run(); + } + } + }; + } - /* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did - * not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to - * the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the - * message comes from failover connection. If so we add a DUPLICATE_ID to handle duplicates after a resend. */ - if (producerExchange.getConnectionContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) { - coreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString()); + SimpleString[] addresses = new SimpleString[actualDestinations.length]; + PagingStore[] pagingStores = new PagingStore[actualDestinations.length]; + + // We fillup addresses, pagingStores and we will throw failure if that's the case + for (int i = 0; i < actualDestinations.length; i++) { + ActiveMQDestination dest = actualDestinations[i]; + addresses[i] = OpenWireUtil.toCoreAddress(dest); + pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]); + if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) { + throw new ResourceAllocationException("Queue is full"); } - SimpleString address = OpenWireUtil.toCoreAddress(dest); - coreMsg.setAddress(address); + } + + + for (int i = 0; i < actualDestinations.length; i++) { + + ServerMessage coreMsg = originalCoreMsg.copy(); - PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address); + coreMsg.setAddress(addresses[i]); + PagingStore store = pagingStores[i]; - // TODO: Improve this, tested with ProducerFlowControlSendFailTest if (store.isFull()) { - result.setBlockNextSend(true); - result.setBlockPagingStore(store); - result.setBlockingAddress(address); - //now we hold this message send until the store has space. - //we do this by put it in a scheduled task - ScheduledExecutorService scheduler = server.getScheduledPool(); - Runnable sendRetryTask = new SendRetryTask(coreMsg, producerExchange, sendProducerAck, messageSend.getSize(), messageSend.getCommandId()); - scheduler.schedule(sendRetryTask, 10, TimeUnit.MILLISECONDS); + connection.getTransportConnection().setAutoRead(false); } - else { - coreSession.send(coreMsg, false); + + getCoreSession().send(coreMsg, false); + + if (runToUse != null) { + // if the timeout is >0, it will wait this much milliseconds + // before running the the runToUse + // this will eventually unblock blocked destinations + // playing flow control + store.checkMemory(runToUse); } } - return result; + } + + public AMQServerSession getCoreSession() { + return this.coreSession; + } + + public ActiveMQServer getCoreServer() { + return this.server; + } + + public void removeConsumer(long consumerId) throws Exception { + boolean failed = !(this.txId != null || this.isTx); + + coreSession.amqCloseConsumer(consumerId, failed); + consumers.remove(consumerId); } public WireFormat getMarshaller() { @@ -467,92 +542,4 @@ public class AMQSession implements SessionCallback { } } } - - private class SendRetryTask implements Runnable { - - private ServerMessage coreMsg; - private AMQProducerBrokerExchange producerExchange; - private boolean sendProducerAck; - private int msgSize; - private int commandId; - - public SendRetryTask(ServerMessage coreMsg, - AMQProducerBrokerExchange producerExchange, - boolean sendProducerAck, - int msgSize, - int commandId) { - this.coreMsg = coreMsg; - this.producerExchange = producerExchange; - this.sendProducerAck = sendProducerAck; - this.msgSize = msgSize; - this.commandId = commandId; - } - - @Override - public void run() { - synchronized (AMQSession.this) { - try { - // check pageStore - SimpleString address = coreMsg.getAddress(); - PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address); - if (store.isFull()) { - // if store is still full, schedule another - server.getScheduledPool().schedule(this, 10, TimeUnit.MILLISECONDS); - } - else { - // now send the message again. - coreSession.send(coreMsg, false); - - if (sendProducerAck) { - ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); - ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), msgSize); - connection.dispatchAsync(ack); - } - else { - Response response = new Response(); - response.setCorrelationId(commandId); - connection.dispatchAsync(response); - } - } - } - catch (Exception e) { - ExceptionResponse response = new ExceptionResponse(e); - response.setCorrelationId(commandId); - connection.dispatchAsync(response); - } - } - - } - } - - // TODO: remove this, we should do the same as we do on core for blocking - public void blockingWaitForSpace(AMQProducerBrokerExchange producerExchange, - SendingResult result) throws IOException { - - - new Exception("blocking").printStackTrace(); - long start = System.currentTimeMillis(); - long nextWarn = start; - producerExchange.blockingOnFlowControl(true); - - AMQConnectionContext context = producerExchange.getConnectionContext(); - PagingStoreImpl store = result.getBlockPagingStore(); - - //Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL - long blockedProducerWarningInterval = 30000; - ProducerId producerId = producerExchange.getProducerState().getInfo().getProducerId(); - - while (store.isFull()) { - if (context.getStopping().get()) { - throw new IOException("Connection closed, send aborted."); - } - - long now = System.currentTimeMillis(); - if (now >= nextWarn) { - ActiveMQServerLogger.LOGGER.memoryLimitReached(producerId.toString(), result.getBlockingAddress().toString(), ((now - start) / 1000)); - nextWarn = now + blockedProducerWarningInterval; - } - } - producerExchange.blockingOnFlowControl(false); - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95f76e2e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java deleted file mode 100644 index 8ab3815..0000000 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/impl/OpenWireServerCallback.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.protocol.openwire.impl; - -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; -import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; - -public class OpenWireServerCallback implements SessionCallback { - - @Override - public boolean hasCredits(ServerConsumer consumerID) { - return false; - } - - @Override - public void sendProducerCreditsMessage(int credits, SimpleString address) { - - } - - @Override - public void sendProducerCreditsFailMessage(int credits, SimpleString address) { - - } - - @Override - public int sendMessage(ServerMessage message, ServerConsumer consumerID, int deliveryCount) { - return 0; - } - - @Override - public int sendLargeMessage(ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount) { - return 0; - } - - @Override - public int sendLargeMessageContinuation(ServerConsumer consumerID, - byte[] body, - boolean continues, - boolean requiresResponse) { - return 0; - } - - @Override - public void closed() { - - } - - @Override - public void disconnect(ServerConsumer consumerId, String queueName) { - - } - - @Override - public boolean isWritable(ReadyListener callback) { - return false; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95f76e2e/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index e831966..566b91a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -126,6 +126,8 @@ public interface PagingStore extends ActiveMQComponent { boolean checkMemory(Runnable runnable); + boolean isFull(); + /** * Write lock the PagingStore. * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95f76e2e/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java index 70d6289..db61f89 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java @@ -138,6 +138,12 @@ public class InVMConnection implements Connection { } @Override + public void setAutoRead(boolean autoRead) { + // nothing to be done on the INVM. + // maybe we could eventually implement something, but not needed now + } + + @Override public ActiveMQBuffer createTransportBuffer(final int size) { return ActiveMQBuffers.dynamicBuffer(size); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95f76e2e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java index 5a160ab..e44a490 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java @@ -170,20 +170,21 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe @Test @BMRules( - rules = {@BMRule( - name = "set no return response", - targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor", - targetMethod = "processCommitTransactionOnePhase", - targetLocation = "ENTRY", - binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", - action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"), + rules = { + @BMRule( + name = "set no return response", + targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor", + targetMethod = "processCommitTransactionOnePhase", + targetLocation = "ENTRY", + binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", + action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"), @BMRule( - name = "stop broker before commit", - targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl", - targetMethod = "commit", - targetLocation = "ENTRY", - action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return")}) + name = "stop broker before commit", + targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl", + targetMethod = "commit", + targetLocation = "ENTRY", + action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return")}) public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception { doTestFailoverConsumerOutstandingSendTx(false); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95f76e2e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java index 914a8e1..1599a2c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java @@ -29,6 +29,7 @@ import javax.transaction.xa.XAResource; import java.util.Collection; import java.util.LinkedList; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; import org.junit.Assert; import org.junit.Test; @@ -59,6 +60,30 @@ public class InvestigationOpenwireTest extends BasicOpenWireTest { } @Test + public void testProducerFlowControl() throws Exception { + try { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString); + + factory.setProducerWindowSize(1024 * 64); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("test")); + + + connection.close(); + + System.err.println("Done!!!"); + } + catch (Exception e) { + e.printStackTrace(); + } + } + + @Test public void testAutoAck() throws Exception { try { @@ -108,6 +133,7 @@ public class InvestigationOpenwireTest extends BasicOpenWireTest { MessageProducer producer = session.createProducer(queue); MessageConsumer consumer = session.createConsumer(queue); producer.send(session.createTextMessage("test")); + producer.send(session.createTextMessage("test2")); connection.start(); Assert.assertNull(consumer.receive(1000)); session.rollback(); @@ -130,6 +156,7 @@ public class InvestigationOpenwireTest extends BasicOpenWireTest { } + @Test public void testClientACK() throws Exception { try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95f76e2e/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java ---------------------------------------------------------------------- diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 6351357..cd08b9e 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -301,6 +301,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase { return 0; } + public boolean isFull() { + return false; + } + @Override public void applySetting(AddressSettings addressSettings) {
