Updated Branches: refs/heads/trunk e366917ef -> 90104943b
https://issues.apache.org/jira/browse/AMQ-4839 Deprecated the streams functionality. Remove in a later release. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/90104943 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/90104943 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/90104943 Branch: refs/heads/trunk Commit: 90104943b28cac1a95b037fae8e338ffe04398fd Parents: e366917 Author: Timothy Bish <[email protected]> Authored: Tue Jan 21 15:00:16 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Tue Jan 21 15:00:16 2014 -0500 ---------------------------------------------------------------------- .../org/apache/activemq/ActiveMQConnection.java | 22 +++++++++++++++++++- .../apache/activemq/ActiveMQInputStream.java | 1 + .../apache/activemq/ActiveMQOutputStream.java | 1 + .../org/apache/activemq/StreamConnection.java | 9 ++++---- .../activemq/ActiveMQInputStreamTest.java | 4 ++++ .../activemq/streams/JMSInputStreamTest.java | 1 + 6 files changed, 33 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/90104943/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index ec4d38a..a722e0b 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -170,9 +170,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private final AtomicBoolean transportFailed = new AtomicBoolean(false); private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>(); private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>(); + private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>(); + // Stream are deprecated and will be removed in a later release. private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>(); private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>(); - private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>(); // Maps ConsumerIds to ActiveMQConsumer objects private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>(); @@ -677,6 +678,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon ActiveMQConnectionConsumer c = i.next(); c.dispose(); } + // Stream are deprecated and will be removed in a later release. for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) { ActiveMQInputStream c = i.next(); c.dispose(); @@ -1598,6 +1600,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon ActiveMQConnectionConsumer c = i.next(); c.dispose(); } + + // Stream are deprecated and will be removed in a later release. for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) { ActiveMQInputStream c = i.next(); c.dispose(); @@ -2186,45 +2190,54 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } @Override + @Deprecated public InputStream createInputStream(Destination dest) throws JMSException { return createInputStream(dest, null); } @Override + @Deprecated public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException { return createInputStream(dest, messageSelector, false); } @Override + @Deprecated public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException { return createInputStream(dest, messageSelector, noLocal, -1); } @Override + @Deprecated public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException { return doCreateInputStream(dest, messageSelector, noLocal, null, timeout); } @Override + @Deprecated public InputStream createDurableInputStream(Topic dest, String name) throws JMSException { return createInputStream(dest, null, false); } @Override + @Deprecated public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException { return createDurableInputStream(dest, name, messageSelector, false); } @Override + @Deprecated public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException { return createDurableInputStream(dest, name, messageSelector, noLocal, -1); } @Override + @Deprecated public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException { return doCreateInputStream(dest, messageSelector, noLocal, name, timeout); } + @Deprecated private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException { checkClosedOrFailed(); ensureConnectionInfoSent(); @@ -2236,6 +2249,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * to disk/database by the broker */ @Override + @Deprecated public OutputStream createOutputStream(Destination dest) throws JMSException { return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); } @@ -2244,6 +2258,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * Creates a non persistent output stream; messages will not be written to * disk */ + @Deprecated public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException { return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE); } @@ -2261,6 +2276,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * method */ @Override + @Deprecated public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException { checkClosedOrFailed(); ensureConnectionInfoSent(); @@ -2338,18 +2354,22 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } } + @Deprecated public void addOutputStream(ActiveMQOutputStream stream) { outputStreams.add(stream); } + @Deprecated public void removeOutputStream(ActiveMQOutputStream stream) { outputStreams.remove(stream); } + @Deprecated public void addInputStream(ActiveMQInputStream stream) { inputStreams.add(stream); } + @Deprecated public void removeInputStream(ActiveMQInputStream stream) { inputStreams.remove(stream); } http://git-wip-us.apache.org/repos/asf/activemq/blob/90104943/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java index 4937431..a895421 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java @@ -43,6 +43,7 @@ import org.apache.activemq.util.JMSExceptionSupport; /** * */ +@Deprecated public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher { private final ActiveMQConnection connection; http://git-wip-us.apache.org/repos/asf/activemq/blob/90104943/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java index 0427c9d..c6e400d 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQOutputStream.java @@ -36,6 +36,7 @@ import org.apache.activemq.util.IntrospectionSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Deprecated public class ActiveMQOutputStream extends OutputStream implements Disposable { private static final Logger LOG = LoggerFactory.getLogger(ActiveMQOutputStream.class); http://git-wip-us.apache.org/repos/asf/activemq/blob/90104943/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java b/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java index 3684719..2f75b4f 100644 --- a/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java @@ -31,9 +31,10 @@ import javax.jms.Topic; * Destination in using standard java InputStream and OutputStream objects. It's * best use case is to send and receive large amounts of data that would be to * large to hold in a single JMS message. - * - * + * + * */ +@Deprecated public interface StreamConnection extends Connection { InputStream createInputStream(Destination dest) throws JMSException; @@ -49,7 +50,7 @@ public interface StreamConnection extends Connection { InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException; InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException; - + InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException; OutputStream createOutputStream(Destination dest) throws JMSException; @@ -67,7 +68,7 @@ public interface StreamConnection extends Connection { * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed * message is part of a pending transaction or has not been acknowledged in * the session. - * + * * @param name the name used to identify this subscription * @throws JMSException if the session fails to unsubscribe to the durable * subscription due to some internal error. http://git-wip-us.apache.org/repos/asf/activemq/blob/90104943/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java index ce628ad..77f422e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java @@ -23,11 +23,13 @@ import javax.jms.Queue; import javax.jms.Session; import junit.framework.TestCase; + import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQDestination; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Deprecated public class ActiveMQInputStreamTest extends TestCase { private static final Logger LOG = LoggerFactory.getLogger(ActiveMQInputStreamTest.class); @@ -39,6 +41,7 @@ public class ActiveMQInputStreamTest extends TestCase { private BrokerService broker; private String connectionUri; + @Override public void setUp() throws Exception { broker = new BrokerService(); broker.setUseJmx(false); @@ -53,6 +56,7 @@ public class ActiveMQInputStreamTest extends TestCase { connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); } + @Override public void tearDown() throws Exception { broker.stop(); broker.waitUntilStopped(); http://git-wip-us.apache.org/repos/asf/activemq/blob/90104943/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java index 25b514a..f392662 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java @@ -41,6 +41,7 @@ import org.apache.activemq.command.ActiveMQTopic; /** * JMSInputStreamTest */ +@Deprecated public class JMSInputStreamTest extends JmsTestSupport { public Destination destination;
