http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/DelegatingSession.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/DelegatingSession.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/DelegatingSession.java new file mode 100644 index 0000000..9d6f2d0 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/DelegatingSession.java @@ -0,0 +1,655 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import java.util.Set; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.client.ClientConsumer; +import org.apache.activemq6.api.core.client.ClientMessage; +import org.apache.activemq6.api.core.client.ClientProducer; +import org.apache.activemq6.api.core.client.ClientSessionFactory; +import org.apache.activemq6.api.core.client.FailoverEventListener; +import org.apache.activemq6.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq6.api.core.client.SessionFailureListener; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; +import org.apache.activemq6.spi.core.remoting.ConsumerContext; +import org.apache.activemq6.utils.ConcurrentHashSet; + +/** + * A DelegatingSession + * <p> + * We wrap the real session, so we can add a finalizer on this and close the session + * on GC if it has not already been closed + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> + */ +public class DelegatingSession implements ClientSessionInternal +{ + private final ClientSessionInternal session; + + private final Exception creationStack; + + private volatile boolean closed; + + private static Set<DelegatingSession> sessions = new ConcurrentHashSet<DelegatingSession>(); + + public static volatile boolean debug; + + public static void dumpSessionCreationStacks() + { + HornetQClientLogger.LOGGER.dumpingSessionStacks(); + + for (DelegatingSession session : DelegatingSession.sessions) + { + HornetQClientLogger.LOGGER.dumpingSessionStack(session.creationStack); + } + } + + + public ClientSessionInternal getInternalSession() + { + return session; + } + + @Override + protected void finalize() throws Throwable + { + // In some scenarios we have seen the JDK finalizing the DelegatingSession while the call to session.close() was still in progress + // + if (!closed && !session.isClosed()) + { + HornetQClientLogger.LOGGER.clientSessionNotClosed(creationStack, System.identityHashCode(this)); + + close(); + } + + super.finalize(); + } + + public DelegatingSession(final ClientSessionInternal session) + { + this.session = session; + + creationStack = new Exception(); + + if (DelegatingSession.debug) + { + DelegatingSession.sessions.add(this); + } + } + + public boolean isClosing() + { + return session.isClosing(); + } + + public void acknowledge(final ClientConsumer consumer, final Message message) throws HornetQException + { + session.acknowledge(consumer, message); + } + + public void individualAcknowledge(final ClientConsumer consumer, final Message message) throws HornetQException + { + session.individualAcknowledge(consumer, message); + } + + public void addConsumer(final ClientConsumerInternal consumer) + { + session.addConsumer(consumer); + } + + public void addFailureListener(final SessionFailureListener listener) + { + session.addFailureListener(listener); + } + + public void addFailoverListener(FailoverEventListener listener) + { + session.addFailoverListener(listener); + } + + public void addProducer(final ClientProducerInternal producer) + { + session.addProducer(producer); + } + + public AddressQuery addressQuery(final SimpleString address) throws HornetQException + { + return session.addressQuery(address); + } + + public void cleanUp(boolean failingOver) throws HornetQException + { + session.cleanUp(failingOver); + } + + public void close() throws HornetQException + { + closed = true; + + if (DelegatingSession.debug) + { + DelegatingSession.sessions.remove(this); + } + + session.close(); + } + + public void commit() throws HornetQException + { + session.commit(); + } + + public void commit(final Xid xid, final boolean onePhase) throws XAException + { + session.commit(xid, onePhase); + } + + public ClientMessage createMessage(final boolean durable) + { + return session.createMessage(durable); + } + + public ClientMessage createMessage(final byte type, + final boolean durable, + final long expiration, + final long timestamp, + final byte priority) + { + return session.createMessage(type, durable, expiration, timestamp, priority); + } + + public ClientMessage createMessage(final byte type, final boolean durable) + { + return session.createMessage(type, durable); + } + + public ClientConsumer createConsumer(final SimpleString queueName, + final SimpleString filterString, + final boolean browseOnly) throws HornetQException + { + return session.createConsumer(queueName, filterString, browseOnly); + } + + public ClientConsumer createConsumer(final SimpleString queueName, + final SimpleString filterString, + final int windowSize, + final int maxRate, + final boolean browseOnly) throws HornetQException + { + return session.createConsumer(queueName, filterString, windowSize, maxRate, browseOnly); + } + + public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString) throws HornetQException + { + return session.createConsumer(queueName, filterString); + } + + public ClientConsumer createConsumer(final SimpleString queueName) throws HornetQException + { + return session.createConsumer(queueName); + } + + public ClientConsumer createConsumer(final String queueName, final String filterString, final boolean browseOnly) throws HornetQException + { + return session.createConsumer(queueName, filterString, browseOnly); + } + + public ClientConsumer createConsumer(final String queueName, + final String filterString, + final int windowSize, + final int maxRate, + final boolean browseOnly) throws HornetQException + { + return session.createConsumer(queueName, filterString, windowSize, maxRate, browseOnly); + } + + public ClientConsumer createConsumer(final String queueName, final String filterString) throws HornetQException + { + return session.createConsumer(queueName, filterString); + } + + public ClientConsumer createConsumer(final String queueName) throws HornetQException + { + return session.createConsumer(queueName); + } + + public ClientConsumer createConsumer(final SimpleString queueName, final boolean browseOnly) throws HornetQException + { + return session.createConsumer(queueName, browseOnly); + } + + public ClientConsumer createConsumer(final String queueName, final boolean browseOnly) throws HornetQException + { + return session.createConsumer(queueName, browseOnly); + } + + public ClientProducer createProducer() throws HornetQException + { + return session.createProducer(); + } + + public ClientProducer createProducer(final SimpleString address, final int rate) throws HornetQException + { + return session.createProducer(address, rate); + } + + public ClientProducer createProducer(final SimpleString address) throws HornetQException + { + return session.createProducer(address); + } + + public ClientProducer createProducer(final String address) throws HornetQException + { + return session.createProducer(address); + } + + public void createQueue(final String address, final String queueName) throws HornetQException + { + session.createQueue(address, queueName); + } + + public void createQueue(final SimpleString address, final SimpleString queueName) throws HornetQException + { + session.createQueue(address, queueName); + } + + public void createQueue(final SimpleString address, final SimpleString queueName, final boolean durable) throws HornetQException + { + session.createQueue(address, queueName, durable); + } + + @Override + public void createSharedQueue(SimpleString address, SimpleString queueName, boolean durable) throws HornetQException + { + session.createSharedQueue(address, queueName, durable); + } + + @Override + public void createSharedQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable) throws HornetQException + { + session.createSharedQueue(address, queueName, filter, durable); + } + + public void createQueue(final SimpleString address, + final SimpleString queueName, + final SimpleString filterString, + final boolean durable) throws HornetQException + { + session.createQueue(address, queueName, filterString, durable); + } + + public void createQueue(final String address, final String queueName, final boolean durable) throws HornetQException + { + session.createQueue(address, queueName, durable); + } + + public void createQueue(final String address, + final String queueName, + final String filterString, + final boolean durable) throws HornetQException + { + session.createQueue(address, queueName, filterString, durable); + } + + public void createTemporaryQueue(final SimpleString address, final SimpleString queueName, final SimpleString filter) throws HornetQException + { + session.createTemporaryQueue(address, queueName, filter); + } + + public void createTemporaryQueue(final SimpleString address, final SimpleString queueName) throws HornetQException + { + session.createTemporaryQueue(address, queueName); + } + + public void createTemporaryQueue(final String address, final String queueName, final String filter) throws HornetQException + { + session.createTemporaryQueue(address, queueName, filter); + } + + public void createTemporaryQueue(final String address, final String queueName) throws HornetQException + { + session.createTemporaryQueue(address, queueName); + } + + public void deleteQueue(final SimpleString queueName) throws HornetQException + { + session.deleteQueue(queueName); + } + + public void deleteQueue(final String queueName) throws HornetQException + { + session.deleteQueue(queueName); + } + + public void end(final Xid xid, final int flags) throws XAException + { + session.end(xid, flags); + } + + public void expire(final ClientConsumer consumer, final Message message) throws HornetQException + { + session.expire(consumer, message); + } + + public void forget(final Xid xid) throws XAException + { + session.forget(xid); + } + + public RemotingConnection getConnection() + { + return session.getConnection(); + } + + public int getMinLargeMessageSize() + { + return session.getMinLargeMessageSize(); + } + + public String getName() + { + return session.getName(); + } + + public int getTransactionTimeout() throws XAException + { + return session.getTransactionTimeout(); + } + + public int getVersion() + { + return session.getVersion(); + } + + public XAResource getXAResource() + { + return session.getXAResource(); + } + + public void preHandleFailover(RemotingConnection connection) + { + session.preHandleFailover(connection); + } + + public void handleFailover(final RemotingConnection backupConnection, HornetQException cause) + { + session.handleFailover(backupConnection, cause); + } + + @Override + public void handleReceiveMessage(ConsumerContext consumerID, ClientMessageInternal message) throws Exception + { + session.handleReceiveMessage(consumerID, message); + } + + @Override + public void handleReceiveLargeMessage(ConsumerContext consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception + { + session.handleReceiveLargeMessage(consumerID, clientLargeMessage, largeMessageSize); + } + + @Override + public void handleReceiveContinuation(ConsumerContext consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception + { + session.handleReceiveContinuation(consumerID, chunk, flowControlSize, isContinues); + } + + @Override + public void handleConsumerDisconnect(ConsumerContext consumerContext) throws HornetQException + { + session.handleConsumerDisconnect(consumerContext); + } + + public boolean isAutoCommitAcks() + { + return session.isAutoCommitAcks(); + } + + public boolean isAutoCommitSends() + { + return session.isAutoCommitSends(); + } + + public boolean isBlockOnAcknowledge() + { + return session.isBlockOnAcknowledge(); + } + + public boolean isCacheLargeMessageClient() + { + return session.isCacheLargeMessageClient(); + } + + public boolean isClosed() + { + return session.isClosed(); + } + + public boolean isSameRM(final XAResource xares) throws XAException + { + return session.isSameRM(xares); + } + + public boolean isXA() + { + return session.isXA(); + } + + public int prepare(final Xid xid) throws XAException + { + return session.prepare(xid); + } + + public QueueQuery queueQuery(final SimpleString queueName) throws HornetQException + { + return session.queueQuery(queueName); + } + + public Xid[] recover(final int flag) throws XAException + { + return session.recover(flag); + } + + public void removeConsumer(final ClientConsumerInternal consumer) throws HornetQException + { + session.removeConsumer(consumer); + } + + public boolean removeFailureListener(final SessionFailureListener listener) + { + return session.removeFailureListener(listener); + } + + public boolean removeFailoverListener(FailoverEventListener listener) + { + return session.removeFailoverListener(listener); + } + + public void removeProducer(final ClientProducerInternal producer) + { + session.removeProducer(producer); + } + + public void rollback() throws HornetQException + { + session.rollback(); + } + + public boolean isRollbackOnly() + { + return session.isRollbackOnly(); + } + + public void rollback(final boolean considerLastMessageAsDelivered) throws HornetQException + { + session.rollback(considerLastMessageAsDelivered); + } + + public void rollback(final Xid xid) throws XAException + { + session.rollback(xid); + } + + public DelegatingSession setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) + { + session.setSendAcknowledgementHandler(handler); + return this; + } + + public boolean setTransactionTimeout(final int seconds) throws XAException + { + return session.setTransactionTimeout(seconds); + } + + public void resetIfNeeded() throws HornetQException + { + session.resetIfNeeded(); + } + + public DelegatingSession start() throws HornetQException + { + session.start(); + return this; + } + + public void start(final Xid xid, final int flags) throws XAException + { + session.start(xid, flags); + } + + public void stop() throws HornetQException + { + session.stop(); + } + + public ClientSessionFactory getSessionFactory() + { + return session.getSessionFactory(); + } + + public void setForceNotSameRM(final boolean force) + { + session.setForceNotSameRM(force); + } + + public void workDone() + { + session.workDone(); + } + + public void sendProducerCreditsMessage(final int credits, final SimpleString address) + { + session.sendProducerCreditsMessage(credits, address); + } + + public ClientProducerCredits getCredits(final SimpleString address, final boolean anon) + { + return session.getCredits(address, anon); + } + + public void returnCredits(final SimpleString address) + { + session.returnCredits(address); + } + + public void handleReceiveProducerCredits(final SimpleString address, final int credits) + { + session.handleReceiveProducerCredits(address, credits); + } + + public void handleReceiveProducerFailCredits(final SimpleString address, final int credits) + { + session.handleReceiveProducerFailCredits(address, credits); + } + + public ClientProducerCreditManager getProducerCreditManager() + { + return session.getProducerCreditManager(); + } + + public void setAddress(Message message, SimpleString address) + { + session.setAddress(message, address); + } + + public void setPacketSize(int packetSize) + { + session.setPacketSize(packetSize); + } + + public void addMetaData(String key, String data) throws HornetQException + { + session.addMetaData(key, data); + } + + public boolean isCompressLargeMessages() + { + return session.isCompressLargeMessages(); + } + + @Override + public String toString() + { + return "DelegatingSession [session=" + session + "]"; + } + + @Override + public void addUniqueMetaData(String key, String data) throws HornetQException + { + session.addUniqueMetaData(key, data); + + } + + public void startCall() + { + session.startCall(); + } + + public void endCall() + { + session.endCall(); + } + + @Override + public void setStopSignal() + { + session.setStopSignal(); + } + + @Override + public boolean isConfirmationWindowEnabled() + { + return session.isConfirmationWindowEnabled(); + } + + @Override + public void scheduleConfirmation(SendAcknowledgementHandler handler, Message msg) + { + session.scheduleConfirmation(handler, msg); + } + + @Override + public String getNodeId() + { + return session.getNodeId(); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/HornetQXAResource.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/HornetQXAResource.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/HornetQXAResource.java new file mode 100644 index 0000000..40c0a7e --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/HornetQXAResource.java @@ -0,0 +1,20 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import javax.transaction.xa.XAResource; + +public interface HornetQXAResource extends XAResource +{ + XAResource getResource(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/LargeMessageController.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/LargeMessageController.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/LargeMessageController.java new file mode 100644 index 0000000..d75ab8a --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/LargeMessageController.java @@ -0,0 +1,66 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import java.io.OutputStream; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; + +/** + * A LargeMessageBufferInternal + * + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + */ +public interface LargeMessageController extends HornetQBuffer +{ + /** + * Returns the size of this buffer. + + */ + long getSize(); + + /** + * Discards packets unused by this buffer. + */ + void discardUnusedPackets(); + + /** + * Closes this buffer. + */ + void close(); + + /** + * Cancels this buffer. + */ + void cancel(); + + /** + * Sets the OutputStream of this buffer to the specified output. + */ + void setOutputStream(final OutputStream output) throws HornetQException; + + /** + * Saves this buffer to the specified output. This is just a blocking version of + * {@link #setOutputStream(OutputStream)}. + */ + void saveBuffer(final OutputStream output) throws HornetQException; + + void addPacket(byte[] chunk, int flowControlSize, boolean isContinues); + + /** + * Waits for the completion for the specified waiting time (in milliseconds). + */ + boolean waitCompletion(long timeWait) throws HornetQException; + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/LargeMessageControllerImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/LargeMessageControllerImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/LargeMessageControllerImpl.java new file mode 100644 index 0000000..e15edc4 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/LargeMessageControllerImpl.java @@ -0,0 +1,1505 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.netty.buffer.ByteBuf; +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQBuffers; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQExceptionType; +import org.apache.activemq6.api.core.HornetQInterruptedException; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.utils.DataConstants; +import org.apache.activemq6.utils.UTF8Util; + +/** + * This class aggregates several {@link org.apache.activemq6.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage} + * as it was being handled + * by a single buffer. This buffer can be consumed as messages are arriving, and it will hold the + * packets until they are read using the ChannelBuffer interface, or the setOutputStream or + * saveStream are called. + * + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + */ +public class LargeMessageControllerImpl implements LargeMessageController +{ + // Constants ----------------------------------------------------- + + private static final String READ_ONLY_ERROR_MESSAGE = "This is a read-only buffer, setOperations are not supported"; + + // Attributes ---------------------------------------------------- + + private final ClientConsumerInternal consumerInternal; + + private final LinkedBlockingQueue<LargeData> largeMessageData = new LinkedBlockingQueue<>(); + + private volatile LargeData currentPacket = null; + + private final long totalSize; + + private final int bufferSize; + + private boolean streamEnded = false; + + private boolean streamClosed = false; + + private final long readTimeout; + + private long readerIndex = 0; + + /** + * This is to control if packets are arriving for a better timeout control + */ + private boolean packetAdded = false; + + private long packetPosition = -1; + + private long lastIndex = 0; + + private long packetLastPosition = -1; + + private OutputStream outStream; + + // There's no need to wait a synchronization + // we just set the exception and let other threads to get it as soon as possible + private volatile Exception handledException; + + private final FileCache fileCache; + + private boolean local = false; + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public LargeMessageControllerImpl(final ClientConsumerInternal consumerInternal, + final long totalSize, + final long readTimeout) + { + this(consumerInternal, totalSize, readTimeout, null); + } + + public LargeMessageControllerImpl(final ClientConsumerInternal consumerInternal, + final long totalSize, + final long readTimeout, + final File cachedFile) + { + this(consumerInternal, totalSize, readTimeout, cachedFile, 10 * 1024); + } + + public LargeMessageControllerImpl(final ClientConsumerInternal consumerInternal, + final long totalSize, + final long readTimeout, + final File cachedFile, + final int bufferSize) + { + this.consumerInternal = consumerInternal; + this.readTimeout = readTimeout; + this.totalSize = totalSize; + if (cachedFile == null) + { + fileCache = null; + } + else + { + fileCache = new FileCache(cachedFile); + } + this.bufferSize = bufferSize; + } + + // Public -------------------------------------------------------- + + public void setLocal(boolean local) + { + this.local = local; + } + + public void discardUnusedPackets() + { + if (outStream == null) + { + if (local) return; + try + { + checkForPacket(totalSize - 1); + } + catch (Throwable ignored) + { + } + } + } + + /** + * TODO: move this to ConsumerContext as large message is a protocol specific thing + * Add a buff to the List, or save it to the OutputStream if set + */ + public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) + { + int flowControlCredit = 0; + + synchronized (this) + { + packetAdded = true; + if (outStream != null) + { + try + { + if (!isContinues) + { + streamEnded = true; + } + + if (fileCache != null) + { + fileCache.cachePackage(chunk); + } + + outStream.write(chunk); + + flowControlCredit = flowControlSize; + + notifyAll(); + + if (streamEnded) + { + outStream.close(); + } + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.errorAddingPacket(e); + handledException = e; + } + } + else + { + if (fileCache != null) + { + try + { + fileCache.cachePackage(chunk); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.errorAddingPacket(e); + handledException = e; + } + } + + + largeMessageData.offer(new LargeData(chunk, flowControlSize, isContinues)); + } + } + + if (flowControlCredit != 0) + { + try + { + consumerInternal.flowControl(flowControlCredit, !isContinues); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.errorAddingPacket(e); + handledException = e; + } + } + } + + public void cancel() + { + this.handledException = HornetQClientMessageBundle.BUNDLE.largeMessageInterrupted(); + + synchronized (this) + { + int totalSize = 0; + LargeData polledPacket = null; + while ((polledPacket = largeMessageData.poll()) != null) + { + totalSize += polledPacket.getFlowControlSize(); + } + + try + { + consumerInternal.flowControl(totalSize, false); + } + catch (Exception ignored) + { + // what else can we do here? + HornetQClientLogger.LOGGER.errorCallingCancel(ignored); + } + + largeMessageData.offer(new LargeData()); + streamEnded = true; + streamClosed = true; + + notifyAll(); + } + } + + public synchronized void close() + { + if (fileCache != null) + { + fileCache.close(); + } + } + + public void setOutputStream(final OutputStream output) throws HornetQException + { + + int totalFlowControl = 0; + boolean continues = false; + + synchronized (this) + { + if (currentPacket != null) + { + sendPacketToOutput(output, currentPacket); + currentPacket = null; + } + while (handledException == null) + { + LargeData packet = largeMessageData.poll(); + if (packet == null) + { + break; + } + totalFlowControl += packet.getFlowControlSize(); + + continues = packet.isContinues(); + sendPacketToOutput(output, packet); + } + + checkException(); + outStream = output; + } + + if (totalFlowControl > 0) + { + consumerInternal.flowControl(totalFlowControl, !continues); + } + } + + public synchronized void saveBuffer(final OutputStream output) throws HornetQException + { + if (streamClosed) + { + throw HornetQClientMessageBundle.BUNDLE.largeMessageLostSession(); + } + setOutputStream(output); + waitCompletion(0); + } + + /** + * @param timeWait Milliseconds to Wait. 0 means forever + * @throws HornetQException + */ + public synchronized boolean waitCompletion(final long timeWait) throws HornetQException + { + if (outStream == null) + { + // There is no stream.. it will never achieve the end of streaming + return false; + } + + long timeOut; + + // If timeWait = 0, we will use the readTimeout + // And we will check if no packets have arrived within readTimeout milliseconds + if (timeWait != 0) + { + timeOut = System.currentTimeMillis() + timeWait; + } + else + { + timeOut = System.currentTimeMillis() + readTimeout; + } + + while (!streamEnded && handledException == null) + { + try + { + this.wait(timeWait == 0 ? readTimeout : timeWait); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + + if (!streamEnded && handledException == null) + { + if (timeWait != 0 && System.currentTimeMillis() > timeOut) + { + + // TODO: what to do here? + //throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, + // "Timeout waiting for LargeMessage Body"); + throw HornetQClientMessageBundle.BUNDLE.timeoutOnLargeMessage(); + } + else if (System.currentTimeMillis() > timeOut && !packetAdded) + { + // throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, + // "No packets have arrived within " + readTimeout + " milliseconds"); + throw HornetQClientMessageBundle.BUNDLE.timeoutOnLargeMessage(); + } + } + } + + checkException(); + + return streamEnded; + + } + + /** + * @throws HornetQException + */ + private void checkException() throws HornetQException + { + // it's not needed to copy it as we never set it back to null + // once the exception is set, the controller is pretty much useless + if (handledException != null) + { + if (handledException instanceof HornetQException) + { + throw (HornetQException)handledException; + } + else + { + throw new HornetQException(HornetQExceptionType.LARGE_MESSAGE_ERROR_BODY, + "Error on saving LargeMessageBufferImpl", + handledException); + } + } + } + + // Channel Buffer Implementation --------------------------------- + + @Override + public int capacity() + { + return -1; + } + + public byte readByte() + { + return getByte(readerIndex++); + } + + @Override + public byte getByte(final int index) + { + return getByte((long)index); + } + + private byte getByte(final long index) + { + checkForPacket(index); + + if (fileCache != null && index < packetPosition) + { + return fileCache.getByteFromCache(index); + } + else + { + return currentPacket.getChunk()[(int)(index - packetPosition)]; + } + } + + @Override + public void getBytes(final int index, final HornetQBuffer dst, final int dstIndex, final int length) + { + byte[] destBytes = new byte[length]; + getBytes(index, destBytes); + dst.setBytes(dstIndex, destBytes); + } + + private void getBytes(final long index, final HornetQBuffer dst, final int dstIndex, final int length) + { + byte[] destBytes = new byte[length]; + getBytes(index, destBytes); + dst.setBytes(dstIndex, destBytes); + } + + @Override + public void getBytes(final int index, final byte[] dst, final int dstIndex, final int length) + { + byte[] bytesToGet = new byte[length]; + + getBytes(index, bytesToGet); + + System.arraycopy(bytesToGet, 0, dst, dstIndex, length); + } + + public void getBytes(final long index, final byte[] dst, final int dstIndex, final int length) + { + byte[] bytesToGet = new byte[length]; + + getBytes(index, bytesToGet); + + System.arraycopy(bytesToGet, 0, dst, dstIndex, length); + } + + @Override + public void getBytes(final int index, final ByteBuffer dst) + { + byte[] bytesToGet = new byte[dst.remaining()]; + getBytes(index, bytesToGet); + dst.put(bytesToGet); + } + + public void getBytes(final long index, final ByteBuffer dst) + { + byte[] bytesToGet = new byte[dst.remaining()]; + getBytes(index, bytesToGet); + dst.put(bytesToGet); + } + + public void getBytes(final int index, final OutputStream out, final int length) throws IOException + { + byte[] bytesToGet = new byte[length]; + getBytes(index, bytesToGet); + out.write(bytesToGet); + } + + public void getBytes(final long index, final OutputStream out, final int length) throws IOException + { + byte[] bytesToGet = new byte[length]; + getBytes(index, bytesToGet); + out.write(bytesToGet); + } + + public int getBytes(final int index, final GatheringByteChannel out, final int length) throws IOException + { + byte[] bytesToGet = new byte[length]; + getBytes(index, bytesToGet); + return out.write(ByteBuffer.wrap(bytesToGet)); + } + + public int getInt(final int index) + { + return (getByte(index) & 0xff) << 24 | (getByte(index + 1) & 0xff) << 16 | + (getByte(index + 2) & 0xff) << 8 | + (getByte(index + 3) & 0xff) << 0; + } + + public int getInt(final long index) + { + return (getByte(index) & 0xff) << 24 | (getByte(index + 1) & 0xff) << 16 | + (getByte(index + 2) & 0xff) << 8 | + (getByte(index + 3) & 0xff) << 0; + } + + @Override + public long getLong(final int index) + { + return ((long)getByte(index) & 0xff) << 56 | ((long)getByte(index + 1) & 0xff) << 48 | + ((long)getByte(index + 2) & 0xff) << 40 | + ((long)getByte(index + 3) & 0xff) << 32 | + ((long)getByte(index + 4) & 0xff) << 24 | + ((long)getByte(index + 5) & 0xff) << 16 | + ((long)getByte(index + 6) & 0xff) << 8 | + ((long)getByte(index + 7) & 0xff) << 0; + } + + public long getLong(final long index) + { + return ((long)getByte(index) & 0xff) << 56 | ((long)getByte(index + 1) & 0xff) << 48 | + ((long)getByte(index + 2) & 0xff) << 40 | + ((long)getByte(index + 3) & 0xff) << 32 | + ((long)getByte(index + 4) & 0xff) << 24 | + ((long)getByte(index + 5) & 0xff) << 16 | + ((long)getByte(index + 6) & 0xff) << 8 | + ((long)getByte(index + 7) & 0xff) << 0; + } + + @Override + public short getShort(final int index) + { + return (short)(getByte(index) << 8 | getByte(index + 1) & 0xFF); + } + + public short getShort(final long index) + { + return (short)(getByte(index) << 8 | getByte(index + 1) & 0xFF); + } + + private int getUnsignedMedium(final int index) + { + return (getByte(index) & 0xff) << 16 | (getByte(index + 1) & 0xff) << 8 | (getByte(index + 2) & 0xff) << 0; + } + + public int getUnsignedMedium(final long index) + { + return (getByte(index) & 0xff) << 16 | (getByte(index + 1) & 0xff) << 8 | (getByte(index + 2) & 0xff) << 0; + } + + @Override + public void setByte(final int index, final byte value) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + @Override + public void setBytes(final int index, final HornetQBuffer src, final int srcIndex, final int length) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + @Override + public void setBytes(final int index, final byte[] src, final int srcIndex, final int length) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + @Override + public void setBytes(final int index, final ByteBuffer src) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + @Override + public void setInt(final int index, final int value) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + @Override + public void setLong(final int index, final long value) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + @Override + public void setShort(final int index, final short value) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + @Override + public ByteBuffer toByteBuffer(final int index, final int length) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public int readerIndex() + { + return (int)readerIndex; + } + + public void readerIndex(final int readerIndex) + { + try + { + checkForPacket(readerIndex); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.errorReadingIndex(e); + throw new RuntimeException(e.getMessage(), e); + } + this.readerIndex = readerIndex; + } + + public int writerIndex() + { + return (int)totalSize; + } + + public long getSize() + { + return totalSize; + } + + public void writerIndex(final int writerIndex) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void setIndex(final int readerIndex, final int writerIndex) + { + try + { + checkForPacket(readerIndex); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.errorSettingIndex(e); + throw new RuntimeException(e.getMessage(), e); + } + this.readerIndex = readerIndex; + } + + public void clear() + { + } + + public boolean readable() + { + return true; + } + + public boolean writable() + { + return false; + } + + public int readableBytes() + { + long readableBytes = totalSize - readerIndex; + + if (readableBytes > Integer.MAX_VALUE) + { + return Integer.MAX_VALUE; + } + else + { + return (int)(totalSize - readerIndex); + } + } + + public int writableBytes() + { + return 0; + } + + public void markReaderIndex() + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void resetReaderIndex() + { + try + { + checkForPacket(0); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.errorReSettingIndex(e); + throw new RuntimeException(e.getMessage(), e); + } + } + + public void markWriterIndex() + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void resetWriterIndex() + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void discardReadBytes() + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public short getUnsignedByte(final int index) + { + return (short)(getByte(index) & 0xFF); + } + + public int getUnsignedShort(final int index) + { + return getShort(index) & 0xFFFF; + } + + public int getMedium(final int index) + { + int value = getUnsignedMedium(index); + if ((value & 0x800000) != 0) + { + value |= 0xff000000; + } + return value; + } + + public long getUnsignedInt(final int index) + { + return getInt(index) & 0xFFFFFFFFL; + } + + public void getBytes(int index, final byte[] dst) + { + // TODO: optimize this by using System.arraycopy + for (int i = 0; i < dst.length; i++) + { + dst[i] = getByte(index++); + } + } + + public void getBytes(long index, final byte[] dst) + { + // TODO: optimize this by using System.arraycopy + for (int i = 0; i < dst.length; i++) + { + dst[i] = getByte(index++); + } + } + + public void getBytes(final int index, final HornetQBuffer dst) + { + getBytes(index, dst, dst.writableBytes()); + } + + public void getBytes(final int index, final HornetQBuffer dst, final int length) + { + if (length > dst.writableBytes()) + { + throw new IndexOutOfBoundsException(); + } + getBytes(index, dst, dst.writerIndex(), length); + dst.writerIndex(dst.writerIndex() + length); + } + + public void setBytes(final int index, final byte[] src) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void setBytes(final int index, final HornetQBuffer src) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void setBytes(final int index, final HornetQBuffer src, final int length) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void setZero(final int index, final int length) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public short readUnsignedByte() + { + return (short)(readByte() & 0xFF); + } + + public short readShort() + { + short v = getShort(readerIndex); + readerIndex += 2; + return v; + } + + public int readUnsignedShort() + { + return readShort() & 0xFFFF; + } + + public int readMedium() + { + int value = readUnsignedMedium(); + if ((value & 0x800000) != 0) + { + value |= 0xff000000; + } + return value; + } + + public int readUnsignedMedium() + { + int v = getUnsignedMedium(readerIndex); + readerIndex += 3; + return v; + } + + public int readInt() + { + int v = getInt(readerIndex); + readerIndex += 4; + return v; + } + + public int readInt(final int pos) + { + int v = getInt(pos); + return v; + } + + public long readUnsignedInt() + { + return readInt() & 0xFFFFFFFFL; + } + + public long readLong() + { + long v = getLong(readerIndex); + readerIndex += 8; + return v; + } + + public void readBytes(final byte[] dst, final int dstIndex, final int length) + { + getBytes(readerIndex, dst, dstIndex, length); + readerIndex += length; + } + + public void readBytes(final byte[] dst) + { + readBytes(dst, 0, dst.length); + } + + public void readBytes(final HornetQBuffer dst) + { + readBytes(dst, dst.writableBytes()); + } + + public void readBytes(final HornetQBuffer dst, final int length) + { + if (length > dst.writableBytes()) + { + throw new IndexOutOfBoundsException(); + } + readBytes(dst, dst.writerIndex(), length); + dst.writerIndex(dst.writerIndex() + length); + } + + public void readBytes(final HornetQBuffer dst, final int dstIndex, final int length) + { + getBytes(readerIndex, dst, dstIndex, length); + readerIndex += length; + } + + public void readBytes(final ByteBuffer dst) + { + int length = dst.remaining(); + getBytes(readerIndex, dst); + readerIndex += length; + } + + public int readBytes(final GatheringByteChannel out, final int length) throws IOException + { + int readBytes = getBytes((int)readerIndex, out, length); + readerIndex += readBytes; + return readBytes; + } + + public void readBytes(final OutputStream out, final int length) throws IOException + { + getBytes(readerIndex, out, length); + readerIndex += length; + } + + public void skipBytes(final int length) + { + + long newReaderIndex = readerIndex + length; + checkForPacket(newReaderIndex); + readerIndex = newReaderIndex; + } + + public void writeByte(final byte value) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void writeShort(final short value) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void writeMedium(final int value) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void writeInt(final int value) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void writeLong(final long value) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void writeBytes(final byte[] src, final int srcIndex, final int length) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void writeBytes(final byte[] src) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void writeBytes(final HornetQBuffer src) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void writeBytes(final HornetQBuffer src, final int length) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void writeBytes(final ByteBuffer src) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public int writeBytes(final InputStream in, final int length) throws IOException + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public int writeBytes(final ScatteringByteChannel in, final int length) throws IOException + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void writeZero(final int length) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public ByteBuffer toByteBuffer() + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public ByteBuffer[] toByteBuffers() + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public ByteBuffer[] toByteBuffers(final int index, final int length) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public String toString(final String charsetName) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public Object getUnderlyingBuffer() + { + return this; + } + + @Override + public boolean readBoolean() + { + return readByte() != 0; + } + + @Override + public char readChar() + { + return (char)readShort(); + } + + public char getChar(final int index) + { + return (char)getShort(index); + } + + public double getDouble(final int index) + { + return Double.longBitsToDouble(getLong(index)); + } + + public float getFloat(final int index) + { + return Float.intBitsToFloat(getInt(index)); + } + + public HornetQBuffer readBytes(final int length) + { + byte[] bytesToGet = new byte[length]; + getBytes(readerIndex, bytesToGet); + readerIndex += length; + return HornetQBuffers.wrappedBuffer(bytesToGet); + } + + @Override + public double readDouble() + { + return Double.longBitsToDouble(readLong()); + } + + @Override + public float readFloat() + { + return Float.intBitsToFloat(readInt()); + } + + @Override + public SimpleString readNullableSimpleString() + { + int b = readByte(); + if (b == DataConstants.NULL) + { + return null; + } + else + { + return readSimpleString(); + } + } + + @Override + public String readNullableString() + { + int b = readByte(); + if (b == DataConstants.NULL) + { + return null; + } + else + { + return readString(); + } + } + + @Override + public SimpleString readSimpleString() + { + int len = readInt(); + byte[] data = new byte[len]; + readBytes(data); + return new SimpleString(data); + } + + @Override + public String readString() + { + int len = readInt(); + + if (len < 9) + { + char[] chars = new char[len]; + for (int i = 0; i < len; i++) + { + chars[i] = (char)readShort(); + } + return new String(chars); + } + else if (len < 0xfff) + { + return readUTF(); + } + else + { + return readSimpleString().toString(); + } + } + + @Override + public String readUTF() + { + return UTF8Util.readUTF(this); + } + + @Override + public void writeBoolean(final boolean val) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + @Override + public void writeChar(final char val) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + @Override + public void writeDouble(final double val) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + + } + + @Override + public void writeFloat(final float val) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + + } + + @Override + public void writeNullableSimpleString(final SimpleString val) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + @Override + public void writeNullableString(final String val) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + @Override + public void writeSimpleString(final SimpleString val) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + @Override + public void writeString(final String val) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + @Override + public void writeUTF(final String utf) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public HornetQBuffer copy() + { + throw new UnsupportedOperationException(); + } + + public HornetQBuffer slice(final int index, final int length) + { + throw new UnsupportedOperationException(); + } + + /** + * @param output + * @param packet + * @throws HornetQException + */ + private void sendPacketToOutput(final OutputStream output, final LargeData packet) throws HornetQException + { + try + { + output.write(packet.getChunk()); + if (!packet.isContinues()) + { + streamEnded = true; + output.close(); + } + } + catch (IOException e) + { + throw HornetQClientMessageBundle.BUNDLE.errorWritingLargeMessage(e); + } + } + + private void popPacket() + { + try + { + + if (streamEnded) + { + // no more packets, we are over the last one already + throw new IndexOutOfBoundsException(); + } + + int sizeToAdd = currentPacket != null ? currentPacket.chunk.length : 1; + currentPacket = largeMessageData.poll(readTimeout, TimeUnit.SECONDS); + if (currentPacket == null) + { + throw new IndexOutOfBoundsException(); + } + + if (currentPacket.chunk == null) // Empty packet as a signal to interruption + { + currentPacket = null; + streamEnded = true; + throw new IndexOutOfBoundsException(); + } + + consumerInternal.flowControl(currentPacket.getFlowControlSize(), !currentPacket.isContinues()); + + packetPosition += sizeToAdd; + + packetLastPosition = packetPosition + currentPacket.getChunk().length; + } + catch (IndexOutOfBoundsException e) + { + throw e; + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + private void checkForPacket(final long index) + { + if (outStream != null) + { + throw new IllegalAccessError("Can't read the messageBody after setting outputStream"); + } + + if (index >= totalSize) + { + throw new IndexOutOfBoundsException(); + } + + if (streamClosed) + { + throw new IllegalAccessError("The consumer associated with this large message was closed before the body was read"); + } + + if (fileCache == null) + { + if (index < lastIndex) + { + throw new IllegalAccessError("LargeMessage have read-only and one-way buffers"); + } + lastIndex = index; + } + + while (index >= packetLastPosition && !streamEnded) + { + popPacket(); + } + } + + private final class FileCache + { + + public FileCache(final File cachedFile) + { + this.cachedFile = cachedFile; + } + + ByteBuffer readCache; + + long readCachePositionStart = Integer.MAX_VALUE; + + long readCachePositionEnd = -1; + + private final File cachedFile; + + private volatile RandomAccessFile cachedRAFile; + + private volatile FileChannel cachedChannel; + + private synchronized void readCache(final long position) + { + + try + { + if (position < readCachePositionStart || position > readCachePositionEnd) + { + + checkOpen(); + + if (position > cachedChannel.size()) + { + throw new ArrayIndexOutOfBoundsException("position > " + cachedChannel.size()); + } + + readCachePositionStart = position / bufferSize * bufferSize; + + cachedChannel.position(readCachePositionStart); + + if (readCache == null) + { + readCache = ByteBuffer.allocate(bufferSize); + } + + readCache.clear(); + + readCachePositionEnd = readCachePositionStart + cachedChannel.read(readCache) - 1; + } + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.errorReadingCache(e); + throw new RuntimeException(e.getMessage(), e); + } + finally + { + close(); + } + } + + public synchronized byte getByteFromCache(final long position) + { + readCache(position); + + return readCache.get((int)(position - readCachePositionStart)); + + } + + public void cachePackage(final byte[] body) throws Exception + { + checkOpen(); + + cachedChannel.position(cachedChannel.size()); + cachedChannel.write(ByteBuffer.wrap(body)); + + close(); + } + + /** + * @throws FileNotFoundException + */ + public void checkOpen() throws FileNotFoundException + { + if (cachedFile != null || !cachedChannel.isOpen()) + { + cachedRAFile = new RandomAccessFile(cachedFile, "rw"); + + cachedChannel = cachedRAFile.getChannel(); + } + } + + public void close() + { + if (cachedChannel != null && cachedChannel.isOpen()) + { + try + { + cachedChannel.close(); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.errorClosingCache(e); + } + cachedChannel = null; + } + + if (cachedRAFile != null) + { + try + { + cachedRAFile.close(); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.errorClosingCache(e); + } + cachedRAFile = null; + } + + } + + @Override + protected void finalize() + { + close(); + if (cachedFile != null && cachedFile.exists()) + { + try + { + cachedFile.delete(); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.errorFinalisingCache(e); + } + } + } + + } + + public ByteBuf byteBuf() + { + return null; + } + + public HornetQBuffer copy(final int index, final int length) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public HornetQBuffer duplicate() + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public HornetQBuffer readSlice(final int length) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void setChar(final int index, final char value) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void setDouble(final int index, final double value) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void setFloat(final int index, final float value) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public HornetQBuffer slice() + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + public void writeBytes(final HornetQBuffer src, final int srcIndex, final int length) + { + throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE); + } + + private static class LargeData + { + final byte[] chunk; + final int flowControlSize; + final boolean continues; + + public LargeData() + { + continues = false; + flowControlSize = 0; + chunk = null; + } + + public LargeData(byte[] chunk, int flowControlSize, boolean continues) + { + this.chunk = chunk; + this.flowControlSize = flowControlSize; + this.continues = continues; + } + + public byte[] getChunk() + { + return chunk; + } + + public int getFlowControlSize() + { + return flowControlSize; + } + + public boolean isContinues() + { + return continues; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/QueueQueryImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/QueueQueryImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/QueueQueryImpl.java new file mode 100644 index 0000000..8ced2c1 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/QueueQueryImpl.java @@ -0,0 +1,99 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.client.impl; + +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.client.ClientSession; + +public class QueueQueryImpl implements ClientSession.QueueQuery +{ + + private final boolean exists; + + private final boolean durable; + + private final boolean temporary; + + private final long messageCount; + + private final SimpleString filterString; + + private final int consumerCount; + + private final SimpleString address; + + private final SimpleString name; + + public QueueQueryImpl(final boolean durable, + final boolean temporary, + final int consumerCount, + final long messageCount, + final SimpleString filterString, + final SimpleString address, + final SimpleString name, + final boolean exists) + { + + this.durable = durable; + this.temporary = temporary; + this.consumerCount = consumerCount; + this.messageCount = messageCount; + this.filterString = filterString; + this.address = address; + this.name = name; + this.exists = exists; + } + + public SimpleString getName() + { + return name; + } + + public SimpleString getAddress() + { + return address; + } + + public int getConsumerCount() + { + return consumerCount; + } + + public SimpleString getFilterString() + { + return filterString; + } + + public long getMessageCount() + { + return messageCount; + } + + public boolean isDurable() + { + return durable; + } + + public boolean isTemporary() + { + return temporary; + } + + public boolean isExists() + { + return exists; + } + +} +
