http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerImpl.java new file mode 100644 index 0000000..ed89a21 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerImpl.java @@ -0,0 +1,1239 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import java.io.File; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Iterator; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQInterruptedException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.client.ClientMessage; +import org.apache.activemq6.api.core.client.ClientSession; +import org.apache.activemq6.api.core.client.ClientSessionFactory; +import org.apache.activemq6.api.core.client.MessageHandler; +import org.apache.activemq6.api.core.client.ServerLocator; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.spi.core.remoting.ConsumerContext; +import org.apache.activemq6.spi.core.remoting.SessionContext; +import org.apache.activemq6.utils.FutureLatch; +import org.apache.activemq6.utils.PriorityLinkedList; +import org.apache.activemq6.utils.PriorityLinkedListImpl; +import org.apache.activemq6.utils.ReusableLatch; +import org.apache.activemq6.utils.TokenBucketLimiter; + +/** + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @version <tt>$Revision: 3603 $</tt> $Id: ClientConsumerImpl.java 3603 2008-01-21 18:49:20Z timfox $ + */ +public final class ClientConsumerImpl implements ClientConsumerInternal +{ + // Constants + // ------------------------------------------------------------------------------------ + + private static final boolean isTrace = HornetQClientLogger.LOGGER.isTraceEnabled(); + + private static final long CLOSE_TIMEOUT_MILLISECONDS = 10000; + + private static final int NUM_PRIORITIES = 10; + + public static final SimpleString FORCED_DELIVERY_MESSAGE = new SimpleString("_hornetq.forced.delivery.seq"); + + // Attributes + // ----------------------------------------------------------------------------------- + + private final ClientSessionInternal session; + + private final SessionContext sessionContext; + + private final ConsumerContext consumerContext; + + private final SimpleString filterString; + + private final SimpleString queueName; + + private final boolean browseOnly; + + private final Executor sessionExecutor; + + // For failover we can't send credits back + // while holding a lock or failover could dead lock eventually + // And we can't use the sessionExecutor as that's being used for message handlers + // for that reason we have a separate flowControlExecutor that's using the thread pool + // Which is a OrderedExecutor + private final Executor flowControlExecutor; + + // Number of pending calls on flow control + private final ReusableLatch pendingFlowControl = new ReusableLatch(0); + + private final int clientWindowSize; + + private final int ackBatchSize; + + private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<ClientMessageInternal>(ClientConsumerImpl.NUM_PRIORITIES); + + private final Runner runner = new Runner(); + + private LargeMessageControllerImpl currentLargeMessageController; + + // When receiving LargeMessages, the user may choose to not read the body, on this case we need to discard the body + // before moving to the next message. + private ClientMessageInternal largeMessageReceived; + + private final TokenBucketLimiter rateLimiter; + + private volatile Thread receiverThread; + + private volatile Thread onMessageThread; + + private volatile MessageHandler handler; + + private volatile boolean closing; + + private volatile boolean closed; + + private volatile int creditsToSend; + + private volatile boolean failedOver; + + private volatile Exception lastException; + + private volatile int ackBytes; + + private volatile ClientMessageInternal lastAckedMessage; + + private boolean stopped = false; + + private long forceDeliveryCount; + + private final ClientSession.QueueQuery queueInfo; + + private volatile boolean ackIndividually; + + private final ClassLoader contextClassLoader; + + // Constructors + // --------------------------------------------------------------------------------- + + public ClientConsumerImpl(final ClientSessionInternal session, + final ConsumerContext consumerContext, + final SimpleString queueName, + final SimpleString filterString, + final boolean browseOnly, + final int clientWindowSize, + final int ackBatchSize, + final TokenBucketLimiter rateLimiter, + final Executor executor, + final Executor flowControlExecutor, + final SessionContext sessionContext, + final ClientSession.QueueQuery queueInfo, + final ClassLoader contextClassLoader) + { + this.consumerContext = consumerContext; + + this.queueName = queueName; + + this.filterString = filterString; + + this.browseOnly = browseOnly; + + this.sessionContext = sessionContext; + + this.session = session; + + this.rateLimiter = rateLimiter; + + sessionExecutor = executor; + + this.clientWindowSize = clientWindowSize; + + this.ackBatchSize = ackBatchSize; + + this.queueInfo = queueInfo; + + this.contextClassLoader = contextClassLoader; + + this.flowControlExecutor = flowControlExecutor; + } + + // ClientConsumer implementation + // ----------------------------------------------------------------- + + public ConsumerContext getConsumerContext() + { + return consumerContext; + } + + private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws HornetQException + { + checkClosed(); + + if (largeMessageReceived != null) + { + // Check if there are pending packets to be received + largeMessageReceived.discardBody(); + largeMessageReceived = null; + } + + if (rateLimiter != null) + { + rateLimiter.limit(); + } + + if (handler != null) + { + throw HornetQClientMessageBundle.BUNDLE.messageHandlerSet(); + } + + if (clientWindowSize == 0) + { + startSlowConsumer(); + } + + receiverThread = Thread.currentThread(); + + // To verify if deliveryForced was already call + boolean deliveryForced = false; + // To control when to call deliveryForce + boolean callForceDelivery = false; + + long start = -1; + + long toWait = timeout == 0 ? Long.MAX_VALUE : timeout; + + try + { + while (true) + { + ClientMessageInternal m = null; + + synchronized (this) + { + while ((stopped || (m = buffer.poll()) == null) && !closed && toWait > 0) + { + if (start == -1) + { + start = System.currentTimeMillis(); + } + + if (m == null && forcingDelivery) + { + if (stopped) + { + break; + } + + // we only force delivery once per call to receive + if (!deliveryForced) + { + callForceDelivery = true; + break; + } + } + + try + { + wait(toWait); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + + if (m != null || closed) + { + break; + } + + long now = System.currentTimeMillis(); + + toWait -= now - start; + + start = now; + } + } + + if (failedOver) + { + if (m == null) + { + // if failed over and the buffer is null, we reset the state and try it again + failedOver = false; + deliveryForced = false; + toWait = timeout == 0 ? Long.MAX_VALUE : timeout; + continue; + } + else + { + failedOver = false; + } + } + + if (callForceDelivery) + { + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("Forcing delivery"); + } + // JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks + sessionContext.forceDelivery(this, forceDeliveryCount++); + callForceDelivery = false; + deliveryForced = true; + continue; + } + + if (m != null) + { + session.workDone(); + + if (m.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) + { + long seq = m.getLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE); + + // Need to check if forceDelivery was called at this call + // As we could be receiving a message that came from a previous call + if (forcingDelivery && deliveryForced && seq == forceDeliveryCount - 1) + { + // forced delivery messages are discarded, nothing has been delivered by the queue + resetIfSlowConsumer(); + + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("There was nothing on the queue, leaving it now:: returning null"); + } + + return null; + } + else + { + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("Ignored force delivery answer as it belonged to another call"); + } + // Ignore the message + continue; + } + } + // if we have already pre acked we can't expire + boolean expired = m.isExpired(); + + flowControlBeforeConsumption(m); + + if (expired) + { + m.discardBody(); + + session.expire(this, m); + + if (clientWindowSize == 0) + { + startSlowConsumer(); + } + + if (toWait > 0) + { + continue; + } + else + { + return null; + } + } + + if (m.isLargeMessage()) + { + largeMessageReceived = m; + } + + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("Returning " + m); + } + + return m; + } + else + { + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("Returning null"); + } + resetIfSlowConsumer(); + return null; + } + } + } + finally + { + receiverThread = null; + } + } + + public ClientMessage receive(final long timeout) throws HornetQException + { + ClientMessage msg = receive(timeout, false); + + if (msg == null && !closed) + { + msg = receive(0, true); + } + + return msg; + } + + public ClientMessage receive() throws HornetQException + { + return receive(0, false); + } + + public ClientMessage receiveImmediate() throws HornetQException + { + return receive(0, true); + } + + public MessageHandler getMessageHandler() throws HornetQException + { + checkClosed(); + + return handler; + } + + // Must be synchronized since messages may be arriving while handler is being set and might otherwise end + // up not queueing enough executors - so messages get stranded + public synchronized ClientConsumerImpl setMessageHandler(final MessageHandler theHandler) throws HornetQException + { + checkClosed(); + + if (receiverThread != null) + { + throw HornetQClientMessageBundle.BUNDLE.inReceive(); + } + + boolean noPreviousHandler = handler == null; + + if (handler != theHandler && clientWindowSize == 0) + { + startSlowConsumer(); + } + + handler = theHandler; + + // if no previous handler existed queue up messages for delivery + if (handler != null && noPreviousHandler) + { + requeueExecutors(); + } + // if unsetting a previous handler may be in onMessage so wait for completion + else if (handler == null && !noPreviousHandler) + { + waitForOnMessageToComplete(true); + } + + return this; + } + + public void close() throws HornetQException + { + doCleanUp(true); + } + + /** + * To be used by MDBs to stop any more handling of messages. + * + * @throws HornetQException + * @param future the future to run once the onMessage Thread has completed + */ + public Thread prepareForClose(final FutureLatch future) throws HornetQException + { + closing = true; + + resetLargeMessageController(); + + //execute the future after the last onMessage call + sessionExecutor.execute(new Runnable() + { + @Override + public void run() + { + future.run(); + } + }); + + return onMessageThread; + } + + public void cleanUp() + { + try + { + doCleanUp(false); + } + catch (HornetQException e) + { + HornetQClientLogger.LOGGER.warn("problem cleaning up: " + this); + } + } + + public boolean isClosed() + { + return closed; + } + + public void stop(final boolean waitForOnMessage) throws HornetQException + { + waitForOnMessageToComplete(waitForOnMessage); + + if (browseOnly) + { + // stop shouldn't affect browser delivery + return; + } + + synchronized (this) + { + if (stopped) + { + return; + } + + stopped = true; + } + } + + public void clearAtFailover() + { + clearBuffer(); + + // failover will issue a start later + this.stopped = true; + + resetLargeMessageController(); + + lastAckedMessage = null; + + creditsToSend = 0; + + failedOver = true; + + ackIndividually = false; + } + + public synchronized void start() + { + stopped = false; + + requeueExecutors(); + } + + public Exception getLastException() + { + return lastException; + } + + // ClientConsumerInternal implementation + // -------------------------------------------------------------- + + public ClientSession.QueueQuery getQueueInfo() + { + return queueInfo; + } + + public SimpleString getFilterString() + { + return filterString; + } + + public SimpleString getQueueName() + { + return queueName; + } + + public boolean isBrowseOnly() + { + return browseOnly; + } + + public synchronized void handleMessage(final ClientMessageInternal message) throws Exception + { + if (closing) + { + // This is ok - we just ignore the message + return; + } + + if (message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) + { + handleCompressedMessage(message); + } + else + { + handleRegularMessage(message); + } + } + + private void handleRegularMessage(ClientMessageInternal message) + { + if (message.getAddress() == null) + { + message.setAddressTransient(queueInfo.getAddress()); + } + + message.onReceipt(this); + + if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) + { + // We have messages of different priorities so we need to ack them individually since the order + // of them in the ServerConsumerImpl delivery list might not be the same as the order they are + // consumed in, which means that acking all up to won't work + ackIndividually = true; + } + + // Add it to the buffer + buffer.addTail(message, message.getPriority()); + + if (handler != null) + { + // Execute using executor + if (!stopped) + { + queueExecutor(); + } + } + else + { + notify(); + } + } + + /** + * This method deals with messages arrived as regular message but its contents are compressed. + * Such messages come from message senders who are configured to compress large messages, and + * if some of the messages are compressed below the min-large-message-size limit, they are sent + * as regular messages. + * <p/> + * However when decompressing the message, we are not sure how large the message could be.. + * for that reason we fake a large message controller that will deal with the message as it was a large message + * <p/> + * Say that you sent a 1G message full of spaces. That could be just bellow 100K compressed but you wouldn't have + * enough memory to decompress it + */ + private void handleCompressedMessage(final ClientMessageInternal clMessage) throws Exception + { + ClientLargeMessageImpl largeMessage = new ClientLargeMessageImpl(); + largeMessage.retrieveExistingData(clMessage); + + File largeMessageCache = null; + + if (session.isCacheLargeMessageClient()) + { + largeMessageCache = File.createTempFile("tmp-large-message-" + largeMessage.getMessageID() + "-", + ".tmp"); + largeMessageCache.deleteOnExit(); + } + + ClientSessionFactory sf = session.getSessionFactory(); + ServerLocator locator = sf.getServerLocator(); + long callTimeout = locator.getCallTimeout(); + + currentLargeMessageController = new LargeMessageControllerImpl(this, largeMessage.getLargeMessageSize(), callTimeout, largeMessageCache); + currentLargeMessageController.setLocal(true); + + //sets the packet + HornetQBuffer qbuff = clMessage.getBodyBuffer(); + int bytesToRead = qbuff.writerIndex() - qbuff.readerIndex(); + final byte[] body = qbuff.readBytes(bytesToRead).toByteBuffer().array(); + + largeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController)); + currentLargeMessageController.addPacket(body, body.length, false); + + handleRegularMessage(largeMessage); + } + + public synchronized void handleLargeMessage(final ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception + { + if (closing) + { + // This is ok - we just ignore the message + return; + } + + // Flow control for the first packet, we will have others + File largeMessageCache = null; + + if (session.isCacheLargeMessageClient()) + { + largeMessageCache = File.createTempFile("tmp-large-message-" + clientLargeMessage.getMessageID() + "-", + ".tmp"); + largeMessageCache.deleteOnExit(); + } + + ClientSessionFactory sf = session.getSessionFactory(); + ServerLocator locator = sf.getServerLocator(); + long callTimeout = locator.getCallTimeout(); + + currentLargeMessageController = new LargeMessageControllerImpl(this, largeMessageSize, callTimeout, largeMessageCache); + + if (clientLargeMessage.isCompressed()) + { + clientLargeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController)); + } + else + { + clientLargeMessage.setLargeMessageController(currentLargeMessageController); + } + + handleRegularMessage(clientLargeMessage); + } + + public synchronized void handleLargeMessageContinuation(final byte[] chunk, final int flowControlSize, final boolean isContinues) throws Exception + { + if (closing) + { + return; + } + if (currentLargeMessageController == null) + { + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("Sending back credits for largeController = null " + flowControlSize); + } + flowControl(flowControlSize, false); + } + else + { + currentLargeMessageController.addPacket(chunk, flowControlSize, isContinues); + } + } + + public void clear(boolean waitForOnMessage) throws HornetQException + { + synchronized (this) + { + // Need to send credits for the messages in the buffer + + Iterator<ClientMessageInternal> iter = buffer.iterator(); + + while (iter.hasNext()) + { + try + { + ClientMessageInternal message = iter.next(); + + if (message.isLargeMessage()) + { + ClientLargeMessageInternal largeMessage = (ClientLargeMessageInternal) message; + largeMessage.getLargeMessageController().cancel(); + } + + flowControlBeforeConsumption(message); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.errorClearingMessages(e); + } + } + + clearBuffer(); + + try + { + resetLargeMessageController(); + } + catch (Throwable e) + { + // nothing that could be done here + HornetQClientLogger.LOGGER.errorClearingMessages(e); + } + } + + // Need to send credits for the messages in the buffer + + waitForOnMessageToComplete(waitForOnMessage); + } + + private void resetLargeMessageController() + { + + LargeMessageController controller = currentLargeMessageController; + if (controller != null) + { + controller.cancel(); + currentLargeMessageController = null; + } + } + + public int getClientWindowSize() + { + return clientWindowSize; + } + + public int getBufferSize() + { + return buffer.size(); + } + + public void acknowledge(final ClientMessage message) throws HornetQException + { + ClientMessageInternal cmi = (ClientMessageInternal) message; + + if (ackIndividually) + { + individualAcknowledge(message); + } + else + { + ackBytes += message.getEncodeSize(); + + if (ackBytes >= ackBatchSize) + { + doAck(cmi); + } + else + { + lastAckedMessage = cmi; + } + } + } + + public void individualAcknowledge(ClientMessage message) throws HornetQException + { + if (lastAckedMessage != null) + { + flushAcks(); + } + + session.individualAcknowledge(this, message); + } + + public void flushAcks() throws HornetQException + { + if (lastAckedMessage != null) + { + doAck(lastAckedMessage); + } + } + + /** + * LargeMessageBuffer will call flowcontrol here, while other handleMessage will also be calling flowControl. + * So, this operation needs to be atomic. + * + * @param discountSlowConsumer When dealing with slowConsumers, we need to discount one credit that was pre-sent when the first receive was called. For largeMessage that is only done at the latest packet + */ + public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException + { + if (clientWindowSize >= 0) + { + creditsToSend += messageBytes; + + if (creditsToSend >= clientWindowSize) + { + if (clientWindowSize == 0 && discountSlowConsumer) + { + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer"); + } + + // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be + // always buffering one after received the first message + final int credits = creditsToSend - 1; + + creditsToSend = 0; + + if (credits > 0) + { + sendCredits(credits); + } + } + else + { + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug("Sending " + messageBytes + " from flow-control"); + } + + final int credits = creditsToSend; + + creditsToSend = 0; + + if (credits > 0) + { + sendCredits(credits); + } + } + } + } + } + + // Public + // --------------------------------------------------------------------------------------- + + // Package protected + // --------------------------------------------------------------------------------------- + + // Protected + // --------------------------------------------------------------------------------------- + + // Private + // --------------------------------------------------------------------------------------- + + /** + * Sending a initial credit for slow consumers + */ + private void startSlowConsumer() + { + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("Sending 1 credit to start delivering of one message to slow consumer"); + } + sendCredits(1); + try + { + // We use an executor here to guarantee the messages will arrive in order. + // However when starting a slow consumer, we have to guarantee the credit was sent before we can perform any + // operations like forceDelivery + pendingFlowControl.await(10, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + // will just ignore and forward the ignored + Thread.currentThread().interrupt(); + } + } + + private void resetIfSlowConsumer() + { + if (clientWindowSize == 0) + { + sendCredits(0); + + // If resetting a slow consumer, we need to wait the execution + final CountDownLatch latch = new CountDownLatch(1); + flowControlExecutor.execute(new Runnable() + { + public void run() + { + latch.countDown(); + } + }); + + try + { + latch.await(10, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + } + + private void requeueExecutors() + { + for (int i = 0; i < buffer.size(); i++) + { + queueExecutor(); + } + } + + private void queueExecutor() + { + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("Adding Runner on Executor for delivery"); + } + + sessionExecutor.execute(runner); + } + + /** + * @param credits + */ + private void sendCredits(final int credits) + { + pendingFlowControl.countUp(); + flowControlExecutor.execute(new Runnable() + { + public void run() + { + try + { + sessionContext.sendConsumerCredits(ClientConsumerImpl.this, credits); + } + finally + { + pendingFlowControl.countDown(); + } + } + }); + } + + private void waitForOnMessageToComplete(boolean waitForOnMessage) + { + if (handler == null) + { + return; + } + + if (!waitForOnMessage || Thread.currentThread() == onMessageThread) + { + // If called from inside onMessage then return immediately - otherwise would block + return; + } + + org.apache.activemq6.utils.FutureLatch future = new FutureLatch(); + + sessionExecutor.execute(future); + + boolean ok = future.await(ClientConsumerImpl.CLOSE_TIMEOUT_MILLISECONDS); + + if (!ok) + { + HornetQClientLogger.LOGGER.timeOutWaitingForProcessing(); + } + } + + private void checkClosed() throws HornetQException + { + if (closed) + { + throw HornetQClientMessageBundle.BUNDLE.consumerClosed(); + } + } + + private void callOnMessage() throws Exception + { + if (closing || stopped) + { + return; + } + + session.workDone(); + + // We pull the message from the buffer from inside the Runnable so we can ensure priority + // ordering. If we just added a Runnable with the message to the executor immediately as we get it + // we could not do that + + ClientMessageInternal message; + + // Must store handler in local variable since might get set to null + // otherwise while this is executing and give NPE when calling onMessage + MessageHandler theHandler = handler; + + if (theHandler != null) + { + if (rateLimiter != null) + { + rateLimiter.limit(); + } + + failedOver = false; + + synchronized (this) + { + message = buffer.poll(); + } + + if (message != null) + { + if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) + { + //Ignore, this could be a relic from a previous receiveImmediate(); + return; + } + + + boolean expired = message.isExpired(); + + flowControlBeforeConsumption(message); + + if (!expired) + { + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("Calling handler.onMessage"); + } + final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() + { + public ClassLoader run() + { + ClassLoader originalLoader = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(contextClassLoader); + + return originalLoader; + } + }); + + onMessageThread = Thread.currentThread(); + try + { + theHandler.onMessage(message); + } + finally + { + try + { + AccessController.doPrivileged(new PrivilegedAction<Object>() + { + public Object run() + { + Thread.currentThread().setContextClassLoader(originalLoader); + return null; + } + }); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.warn(e.getMessage(), e); + } + + onMessageThread = null; + } + + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("Handler.onMessage done"); + } + + if (message.isLargeMessage()) + { + message.discardBody(); + } + } + else + { + session.expire(this, message); + } + + // If slow consumer, we need to send 1 credit to make sure we get another message + if (clientWindowSize == 0) + { + startSlowConsumer(); + } + } + } + } + + /** + * @param message + * @throws HornetQException + */ + private void flowControlBeforeConsumption(final ClientMessageInternal message) throws HornetQException + { + // Chunk messages will execute the flow control while receiving the chunks + if (message.getFlowControlSize() != 0) + { + // on large messages we should discount 1 on the first packets as we need continuity until the last packet + flowControl(message.getFlowControlSize(), !message.isLargeMessage()); + } + } + + private void doCleanUp(final boolean sendCloseMessage) throws HornetQException + { + try + { + if (closed) + { + return; + } + + // We need an extra flag closing, since we need to prevent any more messages getting queued to execute + // after this and we can't just set the closed flag to true here, since after/in onmessage the message + // might be acked and if the consumer is already closed, the ack will be ignored + closing = true; + + // Now we wait for any current handler runners to run. + waitForOnMessageToComplete(true); + + resetLargeMessageController(); + + closed = true; + + synchronized (this) + { + if (receiverThread != null) + { + // Wake up any receive() thread that might be waiting + notify(); + } + + handler = null; + + receiverThread = null; + } + + flushAcks(); + + clearBuffer(); + + if (sendCloseMessage) + { + sessionContext.closeConsumer(this); + } + } + catch (Throwable t) + { + // Consumer close should always return without exception + } + + session.removeConsumer(this); + } + + private void clearBuffer() + { + buffer.clear(); + } + + private void doAck(final ClientMessageInternal message) throws HornetQException + { + ackBytes = 0; + + lastAckedMessage = null; + + session.acknowledge(this, message); + } + + // Inner classes + // -------------------------------------------------------------------------------- + + private class Runner implements Runnable + { + public void run() + { + try + { + callOnMessage(); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.onMessageError(e); + + lastException = e; + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerInternal.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerInternal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerInternal.java new file mode 100644 index 0000000..a4f1551 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientConsumerInternal.java @@ -0,0 +1,72 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.client.ClientConsumer; +import org.apache.activemq6.api.core.client.ClientMessage; +import org.apache.activemq6.api.core.client.ClientSession; +import org.apache.activemq6.utils.FutureLatch; + +/** + * A ClientConsumerInternal + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public interface ClientConsumerInternal extends ClientConsumer +{ + SimpleString getQueueName(); + + SimpleString getFilterString(); + + boolean isBrowseOnly(); + + void handleMessage(ClientMessageInternal message) throws Exception; + + void handleLargeMessage(ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception; + + void handleLargeMessageContinuation(byte[] chunk, int flowControlSize, boolean isContinues) throws Exception; + + void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException; + + void clear(boolean waitForOnMessage) throws HornetQException; + + /** + * To be called by things like MDBs during shutdown of the server + * + * @throws HornetQException + * @param future + */ + Thread prepareForClose(FutureLatch future) throws HornetQException; + + void clearAtFailover(); + + int getClientWindowSize(); + + int getBufferSize(); + + void cleanUp() throws HornetQException; + + void acknowledge(ClientMessage message) throws HornetQException; + + void individualAcknowledge(ClientMessage message) throws HornetQException; + + void flushAcks() throws HornetQException; + + void stop(boolean waitForOnMessage) throws HornetQException; + + void start(); + + ClientSession.QueueQuery getQueueInfo(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageImpl.java new file mode 100644 index 0000000..49f4d89 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageImpl.java @@ -0,0 +1,225 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.core.buffers.impl.ResetLimitWrappedHornetQBuffer; +import org.apache.activemq6.utils.DataConstants; + +/** + * ClientLargeMessageImpl is only created when receiving large messages. + * <p> + * At the time of sending a regular Message is sent as we won't know the message is considered large + * until the buffer is filled up or the user set a streaming. + * @author clebertsuconic + */ +public final class ClientLargeMessageImpl extends ClientMessageImpl implements ClientLargeMessageInternal +{ + + // Used only when receiving large messages + private LargeMessageController largeMessageController; + + private long largeMessageSize; + + /** + * @param largeMessageSize the largeMessageSize to set + */ + public void setLargeMessageSize(long largeMessageSize) + { + this.largeMessageSize = largeMessageSize; + } + + public long getLargeMessageSize() + { + return this.largeMessageSize; + } + + // we only need this constructor as this is only used at decoding large messages on client + public ClientLargeMessageImpl() + { + super(); + } + + // Public -------------------------------------------------------- + + @Override + public int getEncodeSize() + { + if (bodyBuffer != null) + { + return super.getEncodeSize(); + } + else + { + return DataConstants.SIZE_INT + DataConstants.SIZE_INT + getHeadersAndPropertiesEncodeSize(); + } + } + + /** + * @return the largeMessage + */ + @Override + public boolean isLargeMessage() + { + return true; + } + + public void setLargeMessageController(final LargeMessageController controller) + { + largeMessageController = controller; + } + + public void checkCompletion() throws HornetQException + { + checkBuffer(); + } + + @Override + public HornetQBuffer getBodyBuffer() + { + + try + { + checkBuffer(); + } + catch (HornetQException e) + { + throw new RuntimeException(e.getMessage(), e); + } + + return bodyBuffer; + } + + @Override + public int getBodySize() + { + return getLongProperty(Message.HDR_LARGE_BODY_SIZE).intValue(); + } + + public LargeMessageController getLargeMessageController() + { + return largeMessageController; + } + + @Override + public void saveToOutputStream(final OutputStream out) throws HornetQException + { + if (bodyBuffer != null) + { + // The body was rebuilt on the client, so we need to behave as a regular message on this case + super.saveToOutputStream(out); + } + else + { + largeMessageController.saveBuffer(out); + } + } + + @Override + public ClientLargeMessageImpl setOutputStream(final OutputStream out) throws HornetQException + { + if (bodyBuffer != null) + { + super.setOutputStream(out); + } + else + { + largeMessageController.setOutputStream(out); + } + + return this; + } + + @Override + public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws HornetQException + { + if (bodyBuffer != null) + { + return super.waitOutputStreamCompletion(timeMilliseconds); + } + else + { + return largeMessageController.waitCompletion(timeMilliseconds); + } + } + + @Override + public void discardBody() + { + if (bodyBuffer != null) + { + super.discardBody(); + } + else + { + largeMessageController.discardUnusedPackets(); + } + } + + private void checkBuffer() throws HornetQException + { + if (bodyBuffer == null) + { + + long bodySize = this.largeMessageSize + BODY_OFFSET; + if (bodySize > Integer.MAX_VALUE) + { + bodySize = Integer.MAX_VALUE; + } + createBody((int)bodySize); + + bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this); + + largeMessageController.saveBuffer(new HornetQOutputStream(bodyBuffer)); + } + } + + // Inner classes ------------------------------------------------- + + private static class HornetQOutputStream extends OutputStream + { + private final HornetQBuffer bufferOut; + + HornetQOutputStream(HornetQBuffer out) + { + this.bufferOut = out; + } + + @Override + public void write(int b) throws IOException + { + bufferOut.writeByte((byte)(b & 0xff)); + } + } + + public void retrieveExistingData(ClientMessageInternal clMessage) + { + this.messageID = clMessage.getMessageID(); + this.address = clMessage.getAddress(); + this.setUserID(clMessage.getUserID()); + this.setFlowControlSize(clMessage.getFlowControlSize()); + this.setDeliveryCount(clMessage.getDeliveryCount()); + this.type = clMessage.getType(); + this.durable = clMessage.isDurable(); + this.setExpiration(clMessage.getExpiration()); + this.timestamp = clMessage.getTimestamp(); + this.priority = clMessage.getPriority(); + this.properties = clMessage.getProperties(); + this.largeMessageSize = clMessage.getLongProperty(HDR_LARGE_BODY_SIZE); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageInternal.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageInternal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageInternal.java new file mode 100644 index 0000000..61fdd67 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientLargeMessageInternal.java @@ -0,0 +1,31 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + + +/** + * A ClientLargeMessageInternal + * + * @author clebertsuconic + * + * + */ +public interface ClientLargeMessageInternal extends ClientMessageInternal +{ + + void setLargeMessageController(LargeMessageController controller); + + LargeMessageController getLargeMessageController(); + + void setLargeMessageSize(long size); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageImpl.java new file mode 100644 index 0000000..58d29b6 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageImpl.java @@ -0,0 +1,413 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQBuffers; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQPropertyConversionException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.core.message.BodyEncoder; +import org.apache.activemq6.core.message.impl.MessageImpl; +import org.apache.activemq6.reader.MessageUtil; + +/** + * + * A ClientMessageImpl + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * + */ +public class ClientMessageImpl extends MessageImpl implements ClientMessageInternal +{ + // added this constant here so that the client package have no dependency on JMS + public static final SimpleString REPLYTO_HEADER_NAME = MessageUtil.REPLYTO_HEADER_NAME; + + + private int deliveryCount; + + private ClientConsumerInternal consumer; + + private int flowControlSize = -1; + + /** Used on LargeMessages only */ + private InputStream bodyInputStream; + + /* + * Constructor for when reading from remoting + */ + public ClientMessageImpl() + { + } + + /* + * Construct messages before sending + */ + public ClientMessageImpl(final byte type, + final boolean durable, + final long expiration, + final long timestamp, + final byte priority, + final int initialMessageBufferSize) + { + super(type, durable, expiration, timestamp, priority, initialMessageBufferSize); + } + + @Override + public boolean isServerMessage() + { + return false; + } + + @Override + public void onReceipt(final ClientConsumerInternal consumer) + { + this.consumer = consumer; + } + + @Override + public ClientMessageImpl setDeliveryCount(final int deliveryCount) + { + this.deliveryCount = deliveryCount; + return this; + } + + @Override + public int getDeliveryCount() + { + return deliveryCount; + } + + @Override + public ClientMessageImpl acknowledge() throws HornetQException + { + if (consumer != null) + { + consumer.acknowledge(this); + } + + return this; + } + + @Override + public ClientMessageImpl individualAcknowledge() throws HornetQException + { + if (consumer != null) + { + consumer.individualAcknowledge(this); + } + + return this; + } + + @Override + public int getFlowControlSize() + { + if (flowControlSize < 0) + { + throw new IllegalStateException("Flow Control hasn't been set"); + } + return flowControlSize; + } + + @Override + public void setFlowControlSize(final int flowControlSize) + { + this.flowControlSize = flowControlSize; + } + + /** + * @return the largeMessage + */ + @Override + public boolean isLargeMessage() + { + return false; + } + + @Override + public boolean isCompressed() + { + return properties.getBooleanProperty(Message.HDR_LARGE_COMPRESSED); + } + + @Override + public int getBodySize() + { + return buffer.writerIndex() - buffer.readerIndex(); + } + + @Override + public String toString() + { + return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]"; + } + + @Override + public void saveToOutputStream(final OutputStream out) throws HornetQException + { + try + { + byte[] readBuffer = new byte[getBodySize()]; + getBodyBuffer().readBytes(readBuffer); + out.write(readBuffer); + out.flush(); + } + catch (IOException e) + { + throw HornetQClientMessageBundle.BUNDLE.errorSavingBody(e); + } + } + + @Override + public ClientMessageImpl setOutputStream(final OutputStream out) throws HornetQException + { + saveToOutputStream(out); + return this; + } + + @Override + public boolean waitOutputStreamCompletion(final long timeMilliseconds) throws HornetQException + { + return true; + } + + @Override + public void discardBody() + { + } + + /** + * @return the bodyInputStream + */ + @Override + public InputStream getBodyInputStream() + { + return bodyInputStream; + } + + /** + * @param bodyInputStream the bodyInputStream to set + */ + @Override + public ClientMessageImpl setBodyInputStream(final InputStream bodyInputStream) + { + this.bodyInputStream = bodyInputStream; + return this; + } + + @Override + public BodyEncoder getBodyEncoder() throws HornetQException + { + return new DecodingContext(); + } + + @Override + public ClientMessageImpl putBooleanProperty(final SimpleString key, final boolean value) + { + return (ClientMessageImpl) super.putBooleanProperty(key, value); + } + + @Override + public ClientMessageImpl putByteProperty(final SimpleString key, final byte value) + { + return (ClientMessageImpl) super.putByteProperty(key, value); + } + + @Override + public ClientMessageImpl putBytesProperty(final SimpleString key, final byte[] value) + { + return (ClientMessageImpl) super.putBytesProperty(key, value); + } + + @Override + public ClientMessageImpl putCharProperty(SimpleString key, char value) + { + return (ClientMessageImpl) super.putCharProperty(key, value); + } + + @Override + public ClientMessageImpl putCharProperty(String key, char value) + { + return (ClientMessageImpl) super.putCharProperty(key, value); + } + + @Override + public ClientMessageImpl putShortProperty(final SimpleString key, final short value) + { + return (ClientMessageImpl) super.putShortProperty(key, value); + } + + @Override + public ClientMessageImpl putIntProperty(final SimpleString key, final int value) + { + return (ClientMessageImpl) super.putIntProperty(key, value); + } + + @Override + public ClientMessageImpl putLongProperty(final SimpleString key, final long value) + { + return (ClientMessageImpl) super.putLongProperty(key, value); + } + + @Override + public ClientMessageImpl putFloatProperty(final SimpleString key, final float value) + { + return (ClientMessageImpl) super.putFloatProperty(key, value); + } + + @Override + public ClientMessageImpl putDoubleProperty(final SimpleString key, final double value) + { + return (ClientMessageImpl) super.putDoubleProperty(key, value); + } + + @Override + public ClientMessageImpl putStringProperty(final SimpleString key, final SimpleString value) + { + return (ClientMessageImpl) super.putStringProperty(key, value); + } + + @Override + public ClientMessageImpl putObjectProperty(final SimpleString key, final Object value) throws HornetQPropertyConversionException + { + return (ClientMessageImpl) super.putObjectProperty(key, value); + } + + @Override + public ClientMessageImpl putObjectProperty(final String key, final Object value) throws HornetQPropertyConversionException + { + return (ClientMessageImpl) super.putObjectProperty(key, value); + } + + @Override + public ClientMessageImpl putBooleanProperty(final String key, final boolean value) + { + return (ClientMessageImpl) super.putBooleanProperty(key, value); + } + + @Override + public ClientMessageImpl putByteProperty(final String key, final byte value) + { + return (ClientMessageImpl) super.putByteProperty(key, value); + } + + @Override + public ClientMessageImpl putBytesProperty(final String key, final byte[] value) + { + return (ClientMessageImpl) super.putBytesProperty(key, value); + } + + @Override + public ClientMessageImpl putShortProperty(final String key, final short value) + { + return (ClientMessageImpl) super.putShortProperty(key, value); + } + + @Override + public ClientMessageImpl putIntProperty(final String key, final int value) + { + return (ClientMessageImpl) super.putIntProperty(key, value); + } + + @Override + public ClientMessageImpl putLongProperty(final String key, final long value) + { + return (ClientMessageImpl) super.putLongProperty(key, value); + } + + @Override + public ClientMessageImpl putFloatProperty(final String key, final float value) + { + return (ClientMessageImpl) super.putFloatProperty(key, value); + } + + @Override + public ClientMessageImpl putDoubleProperty(final String key, final double value) + { + return (ClientMessageImpl) super.putDoubleProperty(key, value); + } + + @Override + public ClientMessageImpl putStringProperty(final String key, final String value) + { + return (ClientMessageImpl) super.putStringProperty(key, value); + } + + @Override + public ClientMessageImpl writeBodyBufferBytes(byte[] bytes) + { + return (ClientMessageImpl) super.writeBodyBufferBytes(bytes); + } + + @Override + public ClientMessageImpl writeBodyBufferString(String string) + { + return (ClientMessageImpl) super.writeBodyBufferString(string); + } + + private final class DecodingContext implements BodyEncoder + { + public DecodingContext() + { + } + + @Override + public void open() + { + getBodyBuffer().readerIndex(0); + } + + @Override + public void close() + { + } + + @Override + public long getLargeBodySize() + { + if (isLargeMessage()) + { + return getBodyBuffer().writerIndex(); + } + else + { + return getBodyBuffer().writerIndex() - BODY_OFFSET; + } + } + + @Override + public int encode(final ByteBuffer bufferRead) throws HornetQException + { + HornetQBuffer buffer1 = HornetQBuffers.wrappedBuffer(bufferRead); + return encode(buffer1, bufferRead.capacity()); + } + + @Override + public int encode(final HornetQBuffer bufferOut, final int size) + { + byte[] bytes = new byte[size]; + getWholeBuffer().readBytes(bytes); + bufferOut.writeBytes(bytes, 0, size); + return size; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageInternal.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageInternal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageInternal.java new file mode 100644 index 0000000..ab55b88 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientMessageInternal.java @@ -0,0 +1,46 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.client.impl; + +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.client.ClientMessage; +import org.apache.activemq6.utils.TypedProperties; + +/** + * A ClientMessageInternal + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public interface ClientMessageInternal extends ClientMessage +{ + + TypedProperties getProperties(); + + /** Size used for FlowControl */ + int getFlowControlSize(); + + /** Size used for FlowControl */ + void setFlowControlSize(int flowControlSize); + + void setAddressTransient(SimpleString address); + + void onReceipt(ClientConsumerInternal consumer); + + /** + * Discard unused packets (used on large-message) + */ + void discardBody(); + + boolean isCompressed(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManager.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManager.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManager.java new file mode 100644 index 0000000..83360b7 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManager.java @@ -0,0 +1,42 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.spi.core.remoting.SessionContext; + +/** + * A ClientProducerCreditManager + * + * @author Tim Fox + * + * + */ +public interface ClientProducerCreditManager +{ + ClientProducerCredits getCredits(SimpleString address, boolean anon, SessionContext context); + + void returnCredits(SimpleString address); + + void receiveCredits(SimpleString address, int credits); + + void receiveFailCredits(SimpleString address, int credits); + + void reset(); + + void close(); + + int creditsMapSize(); + + int unReferencedCreditsSize(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManagerImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManagerImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManagerImpl.java new file mode 100644 index 0000000..c0fd364 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditManagerImpl.java @@ -0,0 +1,233 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.spi.core.remoting.SessionContext; + +/** + * A ProducerCreditManager + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public class ClientProducerCreditManagerImpl implements ClientProducerCreditManager +{ + public static final int MAX_UNREFERENCED_CREDITS_CACHE_SIZE = 1000; + + private final Map<SimpleString, ClientProducerCredits> producerCredits = new LinkedHashMap<SimpleString, ClientProducerCredits>(); + + private final Map<SimpleString, ClientProducerCredits> unReferencedCredits = new LinkedHashMap<SimpleString, ClientProducerCredits>(); + + private final ClientSessionInternal session; + + private int windowSize; + + public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) + { + this.session = session; + + this.windowSize = windowSize; + } + + public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon, SessionContext context) + { + if (windowSize == -1) + { + return ClientProducerCreditsNoFlowControl.instance; + } + else + { + boolean needInit = false; + ClientProducerCredits credits; + + synchronized (this) + { + credits = producerCredits.get(address); + + if (credits == null) + { + // Doesn't need to be fair since session is single threaded + credits = new ClientProducerCreditsImpl(session, address, windowSize); + needInit = true; + + producerCredits.put(address, credits); + } + + if (!anon) + { + credits.incrementRefCount(); + + // Remove from anon credits (if there) + unReferencedCredits.remove(address); + } + else + { + addToUnReferencedCache(address, credits); + } + } + + // The init is done outside of the lock + // otherwise packages may arrive with flow control + // while this is still sending requests causing a dead lock + if (needInit) + { + credits.init(context); + } + + return credits; + } + } + + public synchronized void returnCredits(final SimpleString address) + { + ClientProducerCredits credits = producerCredits.get(address); + + if (credits != null && credits.decrementRefCount() == 0) + { + addToUnReferencedCache(address, credits); + } + } + + public synchronized void receiveCredits(final SimpleString address, final int credits) + { + ClientProducerCredits cr = producerCredits.get(address); + + if (cr != null) + { + cr.receiveCredits(credits); + } + } + + public synchronized void receiveFailCredits(final SimpleString address, int credits) + { + ClientProducerCredits cr = producerCredits.get(address); + + if (cr != null) + { + cr.receiveFailCredits(credits); + } + } + + public synchronized void reset() + { + for (ClientProducerCredits credits : producerCredits.values()) + { + credits.reset(); + } + } + + public synchronized void close() + { + windowSize = -1; + + for (ClientProducerCredits credits : producerCredits.values()) + { + credits.close(); + } + + producerCredits.clear(); + + unReferencedCredits.clear(); + } + + public synchronized int creditsMapSize() + { + return producerCredits.size(); + } + + public synchronized int unReferencedCreditsSize() + { + return unReferencedCredits.size(); + } + + private void addToUnReferencedCache(final SimpleString address, final ClientProducerCredits credits) + { + unReferencedCredits.put(address, credits); + + if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE) + { + // Remove the oldest entry + + Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = unReferencedCredits.entrySet().iterator(); + + Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next(); + + iter.remove(); + + removeEntry(oldest.getKey(), oldest.getValue()); + } + } + + private void removeEntry(final SimpleString address, final ClientProducerCredits credits) + { + producerCredits.remove(address); + + credits.releaseOutstanding(); + + credits.close(); + } + + + static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits + { + static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl(); + + public void acquireCredits(int credits) throws InterruptedException + { + } + + public void receiveCredits(int credits) + { + } + + public void receiveFailCredits(int credits) + { + } + + public boolean isBlocked() + { + return false; + } + + public void init(SessionContext ctx) + { + } + + public void reset() + { + } + + public void close() + { + } + + public void incrementRefCount() + { + } + + public int decrementRefCount() + { + return 1; + } + + public void releaseOutstanding() + { + } + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCredits.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCredits.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCredits.java new file mode 100644 index 0000000..7ef4916 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCredits.java @@ -0,0 +1,46 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.spi.core.remoting.SessionContext; + +/** + * A ClientProducerCredits + * + * @author Tim Fox + * + * + */ +public interface ClientProducerCredits +{ + void acquireCredits(int credits) throws InterruptedException, HornetQException; + + void receiveCredits(int credits); + + void receiveFailCredits(int credits); + + boolean isBlocked(); + + void init(SessionContext sessionContext); + + void reset(); + + void close(); + + void incrementRefCount(); + + int decrementRefCount(); + + void releaseOutstanding(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditsImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditsImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditsImpl.java new file mode 100644 index 0000000..2b420d5 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerCreditsImpl.java @@ -0,0 +1,228 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.spi.core.remoting.SessionContext; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * A ClientProducerCreditsImpl + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public class ClientProducerCreditsImpl implements ClientProducerCredits +{ + private final Semaphore semaphore; + + private final int windowSize; + + private volatile boolean closed; + + private boolean blocked; + + private final SimpleString address; + + private final ClientSessionInternal session; + + private int pendingCredits; + + private int arriving; + + private int refCount; + + private boolean serverRespondedWithFail; + + private SessionContext sessionContext; + + public ClientProducerCreditsImpl(final ClientSessionInternal session, + final SimpleString address, + final int windowSize) + { + this.session = session; + + this.address = address; + + this.windowSize = windowSize / 2; + + // Doesn't need to be fair since session is single threaded + + semaphore = new Semaphore(0, false); + } + + public void init(SessionContext sessionContext) + { + // We initial request twice as many credits as we request in subsequent requests + // This allows the producer to keep sending as more arrive, minimising pauses + checkCredits(windowSize); + + this.sessionContext = sessionContext; + + this.sessionContext.linkFlowControl(address, this); + } + + public void acquireCredits(final int credits) throws InterruptedException, HornetQException + { + checkCredits(credits); + + + boolean tryAcquire; + + synchronized (this) + { + tryAcquire = semaphore.tryAcquire(credits); + } + + if (!tryAcquire) + { + if (!closed) + { + this.blocked = true; + try + { + while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) + { + // I'm using string concatenation here in case address is null + // better getting a "null" string than a NPE + HornetQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address); + } + } + finally + { + this.blocked = false; + } + } + } + + + synchronized (this) + { + pendingCredits -= credits; + } + + // check to see if the blocking mode is FAIL on the server + synchronized (this) + { + if (serverRespondedWithFail) + { + serverRespondedWithFail = false; + + // remove existing credits to force the client to ask the server for more on the next send + semaphore.drainPermits(); + pendingCredits = 0; + arriving = 0; + + throw HornetQClientMessageBundle.BUNDLE.addressIsFull(address.toString(), credits); + } + } + } + + public boolean isBlocked() + { + return blocked; + } + + public int getBalance() + { + return semaphore.availablePermits(); + } + + public void receiveCredits(final int credits) + { + synchronized (this) + { + arriving -= credits; + } + + semaphore.release(credits); + } + + public void receiveFailCredits(final int credits) + { + serverRespondedWithFail = true; + // receive credits like normal to keep the sender from blocking + receiveCredits(credits); + } + + public synchronized void reset() + { + // Any pendingCredits credits from before failover won't arrive, so we re-initialise + + semaphore.drainPermits(); + + int beforeFailure = pendingCredits; + + pendingCredits = 0; + arriving = 0; + + // If we are waiting for more credits than what's configured, then we need to use what we tried before + // otherwise the client may starve as the credit will never arrive + checkCredits(Math.max(windowSize * 2, beforeFailure)); + } + + public void close() + { + // Closing a producer that is blocking should make it return + closed = true; + + semaphore.release(Integer.MAX_VALUE / 2); + } + + public synchronized void incrementRefCount() + { + refCount++; + } + + public synchronized int decrementRefCount() + { + return --refCount; + } + + public synchronized void releaseOutstanding() + { + semaphore.drainPermits(); + } + + private void checkCredits(final int credits) + { + int needed = Math.max(credits, windowSize); + + int toRequest = -1; + + synchronized (this) + { + if (semaphore.availablePermits() + arriving < needed) + { + toRequest = needed - arriving; + + pendingCredits += toRequest; + arriving += toRequest; + } + } + + if (toRequest != -1) + { + requestCredits(toRequest); + } + } + + private void requestCredits(final int credits) + { + session.sendProducerCreditsMessage(credits, address); + } +} \ No newline at end of file
