Author: rajdavies
Date: Mon Aug 3 07:42:43 2009
New Revision: 800235
URL: http://svn.apache.org/viewvc?rev=800235&view=rev
Log:
Added patch for https://issues.apache.org/activemq/browse/AMQ-1744
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
(with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
(with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
(with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
(with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
(with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
(with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
(with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
(with props)
Modified:
activemq/trunk/activemq-core/pom.xml
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java
Modified: activemq/trunk/activemq-core/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Mon Aug 3 07:42:43 2009
@@ -140,6 +140,11 @@
<artifactId>xalan</artifactId>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+
+</dependency>
<!-- not really a dependency at all - just added optionally to get the
generator working -->
@@ -460,6 +465,10 @@
<!-- https://issues.apache.org/activemq/browse/AMQ-2050 -->
<exclude>**/ProxyConnectorTest.*</exclude>
+ <!-- FTPBlob tests need FTP server running -->
+
+ <exclude>**/FTPBlob*/</exclude>
+
</excludes>
</configuration>
</plugin>
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon Aug 3 07:42:43 2009
@@ -16,29 +16,12 @@
*/
package org.apache.activemq;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-
+import org.apache.activemq.blob.BlobDownloader;
+import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
@@ -57,6 +40,24 @@
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
/**
* A client uses a <CODE>MessageConsumer</CODE> object to receive messages
@@ -485,6 +486,9 @@
*/
private ActiveMQMessage createActiveMQMessage(final MessageDispatch md)
throws JMSException {
ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
+ if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
+ ((ActiveMQBlobMessage)m).setBlobDownloader(new
BlobDownloader(session.getBlobTransferPolicy()));
+ }
if (transformer != null) {
Message transformedMessage =
transformer.consumerTransform(session, this, m);
if (transformedMessage != null) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
Mon Aug 3 07:42:43 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq;
+import java.net.MalformedURLException;
import java.util.Enumeration;
import javax.jms.BytesMessage;
@@ -32,6 +33,9 @@
import javax.jms.TextMessage;
import javax.jms.Topic;
+import org.apache.activemq.blob.BlobDownloader;
+import org.apache.activemq.blob.BlobUploader;
+import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
@@ -165,6 +169,17 @@
msg.setConnection(connection);
msg.setText(textMsg.getText());
activeMessage = msg;
+ } else if (message instanceof BlobMessage) {
+ BlobMessage blobMessage = (BlobMessage)message;
+ ActiveMQBlobMessage msg = new ActiveMQBlobMessage();
+ msg.setConnection(connection);
+ msg.setBlobDownloader(new
BlobDownloader(connection.getBlobTransferPolicy()));
+ try {
+ msg.setURL(blobMessage.getURL());
+ } catch (MalformedURLException e) {
+
+ }
+ activeMessage = msg;
} else {
activeMessage = new ActiveMQMessage();
activeMessage.setConnection(connection);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Mon Aug 3 07:42:43 2009
@@ -42,6 +42,7 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.blob.BlobDownloader;
/**
* <P>
@@ -410,6 +411,7 @@
configureMessage(message);
message.setURL(url);
message.setDeletedByBroker(deletedByBroker);
+ message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
return message;
}
@@ -430,6 +432,7 @@
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
configureMessage(message);
message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(),
file));
+ message.setBlobDownloader(new
BlobDownloader((getBlobTransferPolicy())));
message.setDeletedByBroker(true);
message.setName(file.getName());
return message;
@@ -452,6 +455,7 @@
ActiveMQBlobMessage message = new ActiveMQBlobMessage();
configureMessage(message);
message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
+ message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
message.setDeletedByBroker(true);
return message;
}
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java?rev=800235&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
Mon Aug 3 07:42:43 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * Represents a strategy of downloading a file/stream from some remote
+ */
+public interface BlobDownloadStrategy {
+
+ InputStream getInputStream(ActiveMQBlobMessage message) throws
IOException, JMSException;
+
+}
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java?rev=800235&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
Mon Aug 3 07:42:43 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+
+/**
+ * Mediator for Blob Download
+ */
+public class BlobDownloader {
+
+ private BlobTransferPolicy blobTransferPolicy;
+
+ public BlobDownloader(BlobTransferPolicy transferPolicy) {
+ this.blobTransferPolicy = transferPolicy;
+ }
+
+ public InputStream getInputStream(ActiveMQBlobMessage message) throws
IOException, JMSException {
+ return getStrategy().getInputStream(message);
+ }
+
+ public BlobTransferPolicy getBlobTransferPolicy() {
+ return blobTransferPolicy;
+ }
+
+ public BlobDownloadStrategy getStrategy() {
+ return getBlobTransferPolicy().getDownloadStrategy();
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
Mon Aug 3 07:42:43 2009
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.blob;
+import java.net.MalformedURLException;
+import java.net.URL;
+
/**
* The policy for configuring how BLOBs (Binary Large OBjects) are transferred
* out of band between producers, brokers and consumers.
@@ -28,6 +31,7 @@
private String uploadUrl;
private int bufferSize = 128 * 1024;
private BlobUploadStrategy uploadStrategy;
+ private BlobDownloadStrategy downloadStrategy;
/**
* Returns a copy of this policy object
@@ -90,6 +94,13 @@
return uploadStrategy;
}
+ public BlobDownloadStrategy getDownloadStrategy() {
+ if(downloadStrategy == null) {
+ downloadStrategy = createDownloadStrategy();
+ }
+ return downloadStrategy;
+ }
+
/**
* Sets the upload strategy to use for uploading BLOBs to some URL
*/
@@ -108,7 +119,49 @@
this.bufferSize = bufferSize;
}
+ /**
+ * Returns the upload strategy depending on the information from the
+ * uploadURL. Currently supportet HTTP and FTP
+ *
+ * @return
+ */
protected BlobUploadStrategy createUploadStrategy() {
- return new DefaultBlobUploadStrategy(this);
+ BlobUploadStrategy strategy;
+ try {
+ URL url = new URL(getUploadUrl());
+
+ if(url.getProtocol().equalsIgnoreCase("FTP")) {
+ strategy = new FTPBlobUploadStrategy(this);
+ } else {
+ strategy = new DefaultBlobUploadStrategy(this);
}
+ } catch (MalformedURLException e) {
+ strategy = new DefaultBlobUploadStrategy(this);
+}
+ return strategy;
+ }
+
+ /**
+ * Returns the download strategy depending on the information from the
+ * uploadURL. Currently supportet HTTP and FTP
+ *
+ * @return
+ */
+ protected BlobDownloadStrategy createDownloadStrategy() {
+ BlobDownloadStrategy strategy;
+ try {
+ URL url = new URL(getUploadUrl());
+
+ if(url.getProtocol().equalsIgnoreCase("FTP")) {
+ strategy = new FTPBlobDownloadStrategy();
+ } else {
+ strategy = new DefaultBlobDownloadStrategy();
+ }
+ } catch (MalformedURLException e) {
+ strategy = new DefaultBlobDownloadStrategy();
+ }
+ return strategy;
+ }
+
+
}
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java?rev=800235&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
Mon Aug 3 07:42:43 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * A default implementation of {...@link BlobDownloadStrategy} which uses the
URL
+ * class to download files or streams from a remote URL
+ */
+public class DefaultBlobDownloadStrategy implements BlobDownloadStrategy{
+
+ public InputStream getInputStream(ActiveMQBlobMessage message) throws
IOException, JMSException {
+ URL value = message.getURL();
+ if (value == null) {
+ return null;
+ }
+ return value.openStream();
+ }
+
+}
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java?rev=800235&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
Mon Aug 3 07:42:43 2009
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.commons.net.ftp.FTPClient;
+
+/**
+ * A FTP implementation for {...@link BlobDownloadStrategy}.
+ */
+public class FTPBlobDownloadStrategy implements BlobDownloadStrategy {
+ private String ftpUser;
+ private String ftpPass;
+
+ public InputStream getInputStream(ActiveMQBlobMessage message) throws
IOException, JMSException {
+ URL url = message.getURL();
+
+ setUserInformation(url.getUserInfo());
+ String connectUrl = url.getHost();
+ int port = url.getPort() < 1 ? 21 : url.getPort();
+
+ FTPClient ftp = new FTPClient();
+ try {
+ ftp.connect(connectUrl, port);
+ } catch(ConnectException e) {
+ throw new JMSException("Problem connecting the FTP-server");
+ }
+
+ if(!ftp.login(ftpUser, ftpPass)) {
+ ftp.quit();
+ ftp.disconnect();
+ throw new JMSException("Cant Authentificate to FTP-Server");
+ }
+ String path = url.getPath();
+ String workingDir = path.substring(0, path.lastIndexOf("/"));
+ String file = path.substring(path.lastIndexOf("/")+1);
+
+ ftp.changeWorkingDirectory(workingDir);
+ ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
+ InputStream input = ftp.retrieveFileStream(file);
+ ftp.quit();
+ ftp.disconnect();
+
+ return input;
+ }
+
+ private void setUserInformation(String userInfo) {
+ if(userInfo != null) {
+ String[] userPass = userInfo.split(":");
+ if(userPass.length > 0) this.ftpUser = userPass[0];
+ if(userPass.length > 1) this.ftpPass = userPass[1];
+ } else {
+ this.ftpUser = "anonymous";
+ this.ftpPass = "anonymous";
+ }
+ }
+
+}
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java?rev=800235&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
Mon Aug 3 07:42:43 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.commons.net.ftp.FTPClient;
+
+/**
+ * A FTP implementation of {...@link BlobUploadStrategy}.
+ */
+public class FTPBlobUploadStrategy implements BlobUploadStrategy {
+
+ private URL url;
+ private String ftpUser = "";
+ private String ftpPass = "";
+ private BlobTransferPolicy transferPolicy;
+
+ public FTPBlobUploadStrategy(BlobTransferPolicy transferPolicy) throws
MalformedURLException {
+ this.transferPolicy = transferPolicy;
+ this.url = new URL(this.transferPolicy.getUploadUrl());
+
+ setUserInformation(url.getUserInfo());
+ }
+
+ public URL uploadFile(ActiveMQBlobMessage message, File file)
+ throws JMSException, IOException {
+ return uploadStream(message, new FileInputStream(file));
+ }
+
+ public URL uploadStream(ActiveMQBlobMessage message, InputStream in)
+ throws JMSException, IOException {
+ String connectUrl = url.getHost();
+ int port = url.getPort() < 1 ? 21 : url.getPort();
+
+ FTPClient ftp = new FTPClient();
+ try {
+ ftp.connect(connectUrl, port);
+ } catch(ConnectException e) {
+ throw new JMSException("Problem connecting the FTP-server");
+ }
+ if(!ftp.login(ftpUser, ftpPass)) {
+ ftp.quit();
+ ftp.disconnect();
+ throw new JMSException("Cant Authentificate to
FTP-Server");
+ }
+ String path = url.getPath();
+ String workingDir = path.substring(0, path.lastIndexOf("/"));
+ String filename =
message.getMessageId().toString().replaceAll(":", "_");
+ ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
+
+ String url;
+ if(!ftp.changeWorkingDirectory(workingDir)) {
+ url = this.url.toString().replaceFirst(this.url.getPath(),
"")+"/";
+ } else {
+ url = this.url.toString();
+ }
+
+ ftp.storeFile(filename, in);
+ ftp.quit();
+ ftp.disconnect();
+
+ return new URL(url + filename);
+ }
+
+ private void setUserInformation(String userInfo) {
+ if(userInfo != null) {
+ String[] userPass = userInfo.split(":");
+ if(userPass.length > 0) this.ftpUser = userPass[0];
+ if(userPass.length > 1) this.ftpPass = userPass[1];
+ } else {
+ this.ftpUser = "anonymous";
+ this.ftpPass = "anonymous";
+ }
+ }
+
+}
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java
Mon Aug 3 07:42:43 2009
@@ -24,6 +24,7 @@
import javax.jms.JMSException;
import org.apache.activemq.BlobMessage;
+import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.blob.BlobUploader;
import org.apache.activemq.util.JMSExceptionSupport;
@@ -44,6 +45,7 @@
private boolean deletedByBroker;
private transient BlobUploader blobUploader;
+ private transient BlobDownloader blobDownloader;
private transient URL url;
public Message copy() {
@@ -123,11 +125,10 @@
}
public InputStream getInputStream() throws IOException, JMSException {
- URL value = getURL();
- if (value == null) {
+ if(blobDownloader == null) {
return null;
}
- return value.openStream();
+ return blobDownloader.getInputStream(this);
}
public URL getURL() throws JMSException {
@@ -154,6 +155,14 @@
this.blobUploader = blobUploader;
}
+ public BlobDownloader getBlobDownloader() {
+ return blobDownloader;
+ }
+
+ public void setBlobDownloader(BlobDownloader blobDownloader) {
+ this.blobDownloader = blobDownloader;
+ }
+
public void onSend() throws JMSException {
super.onSend();
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java?rev=800235&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
Mon Aug 3 07:42:43 2009
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.InputStream;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * To start this test make sure an ftp server is running with
+ * user: activemq and password: activemq.
+ * Also a file called test.txt with the content <b>hello world</b> must be in
the ftptest directory.
+ */
+public class FTPBlobDownloadStrategyTest extends TestCase {
+
+ public void xtestDownload() {
+ ActiveMQBlobMessage message = new ActiveMQBlobMessage();
+ BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
+ InputStream stream;
+ try {
+ message.setURL(new
URL("ftp://activemq:activ...@localhost/ftptest/test.txt"));
+ stream = strategy.getInputStream(message);
+ int i = stream.read();
+ StringBuilder sb = new StringBuilder(10);
+ while(i != -1) {
+ sb.append((char)i);
+ i = stream.read();
+ }
+ Assert.assertEquals("hello world", sb.toString());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.assertTrue(false);
+ }
+ }
+
+ public void xtestWrongAuthentification() {
+ ActiveMQBlobMessage message = new ActiveMQBlobMessage();
+ BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
+ try {
+ message.setURL(new
URL("ftp://activemq:activemq_wr...@localhost/ftptest/test.txt"));
+ strategy.getInputStream(message);
+ } catch(JMSException e) {
+ Assert.assertEquals("Wrong Exception", "Cant
Authentificate to FTP-Server", e.getMessage());
+ return;
+ } catch(Exception e) {
+ System.out.println(e);
+ Assert.assertTrue("Wrong Exception "+ e, false);
+ return;
+ }
+
+ Assert.assertTrue("Expect Exception", false);
+ }
+
+ public void xtestWrongFTPPort() {
+ ActiveMQBlobMessage message = new ActiveMQBlobMessage();
+ BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
+ try {
+ message.setURL(new
URL("ftp://activemq:activ...@localhost:442/ftptest/test.txt"));
+ strategy.getInputStream(message);
+ } catch(JMSException e) {
+ Assert.assertEquals("Wrong Exception", "Problem
connecting the FTP-server", e.getMessage());
+ return;
+ } catch(Exception e) {
+ e.printStackTrace();
+ Assert.assertTrue("Wrong Exception "+ e, false);
+ return;
+ }
+
+ Assert.assertTrue("Expect Exception", false);
+ }
+
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java?rev=800235&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
Mon Aug 3 07:42:43 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.InputStream;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.BlobMessage;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * To start this test make sure an ftp server is running with
+ * user: activemq and password: activemq
+ */
+public class FTPBlobTest extends EmbeddedBrokerTestSupport {
+
+ private ActiveMQConnection connection;
+
+ protected void setUp() throws Exception {
+ bindAddress =
"vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=ftp://activemq:activ...@localhost/ftptest/";
+ super.setUp();
+
+ connection = (ActiveMQConnection) createConnection();
+ connection.start();
+ }
+
+ public void testBlobFile() throws Exception {
+ // first create Message
+ File file = File.createTempFile("amq-data-file-", ".dat");
+ // lets write some data
+ String content = "hello world "+ System.currentTimeMillis();
+ BufferedWriter writer = new BufferedWriter(new FileWriter(file));
+ writer.append(content);
+ writer.close();
+
+ ActiveMQSession session = (ActiveMQSession)
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ MessageConsumer consumer = session.createConsumer(destination);
+ BlobMessage message = session.createBlobMessage(file);
+
+ producer.send(message);
+ Thread.sleep(1000);
+
+ // check message send
+ Message msg = consumer.receive(1000);
+ Assert.assertTrue(msg instanceof ActiveMQBlobMessage);
+
+ InputStream input = ((ActiveMQBlobMessage) msg).getInputStream();
+ StringBuilder b = new StringBuilder();
+ int i = input.read();
+ while(i != -1) {
+ b.append((char) i);
+ i = input.read();
+ }
+ Assert.assertEquals(content, b.toString());
+ }
+
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java?rev=800235&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
Mon Aug 3 07:42:43 2009
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.net.URL;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.commons.net.ftp.FTPClient;
+
+/**
+ * To start this test make sure an ftp server is running with
+ * user: activemq and password: activemq
+ */
+public class FTPBlobUploadStrategyTest extends EmbeddedBrokerTestSupport {
+
+ private Connection connection;
+
+ protected void setUp() throws Exception {
+ bindAddress =
"vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=ftp://activemq:activ...@localhost/ftptest/";
+ super.setUp();
+
+ connection = createConnection();
+ connection.start();
+
+ // check if file exist and delete it
+ URL url = new URL("ftp://activemq:activ...@localhost/ftptest/");
+ String connectUrl = url.getHost();
+ int port = url.getPort() < 1 ? 21 : url.getPort();
+
+ FTPClient ftp = new FTPClient();
+ ftp.connect(connectUrl, port);
+ if(!ftp.login("activemq", "activemq")) {
+ ftp.quit();
+ ftp.disconnect();
+ throw new JMSException("Cant Authentificate to
FTP-Server");
+ }
+ ftp.changeWorkingDirectory("ftptest");
+ ftp.deleteFile("testmessage");
+ ftp.quit();
+ ftp.disconnect();
+ }
+
+ public void testFileUpload() throws Exception {
+ File file = File.createTempFile("amq-data-file-", ".dat");
+ // lets write some data
+ BufferedWriter writer = new BufferedWriter(new FileWriter(file));
+ writer.append("hello world");
+ writer.close();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ ((ActiveMQConnection)connection).setCopyMessageOnSend(false);
+
+ ActiveMQBlobMessage message = (ActiveMQBlobMessage)
((ActiveMQSession)session).createBlobMessage(file);
+ message.setMessageId(new MessageId("testmessage"));
+ message.onSend();
+
Assert.assertEquals("ftp://activemq:activ...@localhost/ftptest/testmessage",
message.getURL().toString());
+ }
+
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain