Author: rajdavies
Date: Mon Aug  3 07:42:43 2009
New Revision: 800235

URL: http://svn.apache.org/viewvc?rev=800235&view=rev
Log:
Added patch for https://issues.apache.org/activemq/browse/AMQ-1744

Added:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
   (with props)
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
   (with props)
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
   (with props)
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
   (with props)
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
   (with props)
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
   (with props)
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
   (with props)
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
   (with props)
Modified:
    activemq/trunk/activemq-core/pom.xml
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Mon Aug  3 07:42:43 2009
@@ -140,6 +140,11 @@
       <artifactId>xalan</artifactId>
       <optional>true</optional>
     </dependency>
+    <dependency>
+    <groupId>commons-net</groupId>
+    <artifactId>commons-net</artifactId>
+    
+</dependency>
 
 
     <!-- not really a dependency at all - just added optionally to get the 
generator working -->
@@ -460,6 +465,10 @@
             <!-- https://issues.apache.org/activemq/browse/AMQ-2050 -->
             <exclude>**/ProxyConnectorTest.*</exclude>
             
+            <!-- FTPBlob tests need FTP server running -->
+            
+            <exclude>**/FTPBlob*/</exclude>
+            
           </excludes>
         </configuration>
       </plugin>

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 Mon Aug  3 07:42:43 2009
@@ -16,29 +16,12 @@
  */
 package org.apache.activemq;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-
+import org.apache.activemq.blob.BlobDownloader;
+import org.apache.activemq.command.ActiveMQBlobMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.CommandTypes;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.MessageAck;
@@ -57,6 +40,24 @@
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 
 /**
  * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
@@ -485,6 +486,9 @@
      */
     private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) 
throws JMSException {
         ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
+        if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
+               ((ActiveMQBlobMessage)m).setBlobDownloader(new 
BlobDownloader(session.getBlobTransferPolicy()));
+        }
         if (transformer != null) {
             Message transformedMessage = 
transformer.consumerTransform(session, this, m);
             if (transformedMessage != null) {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
 Mon Aug  3 07:42:43 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq;
 
+import java.net.MalformedURLException;
 import java.util.Enumeration;
 
 import javax.jms.BytesMessage;
@@ -32,6 +33,9 @@
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.activemq.blob.BlobDownloader;
+import org.apache.activemq.blob.BlobUploader;
+import org.apache.activemq.command.ActiveMQBlobMessage;
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMapMessage;
@@ -165,6 +169,17 @@
                 msg.setConnection(connection);
                 msg.setText(textMsg.getText());
                 activeMessage = msg;
+            } else if (message instanceof BlobMessage) {
+               BlobMessage blobMessage = (BlobMessage)message;
+               ActiveMQBlobMessage msg = new ActiveMQBlobMessage();
+               msg.setConnection(connection);
+               msg.setBlobDownloader(new 
BlobDownloader(connection.getBlobTransferPolicy()));
+               try {
+                                       msg.setURL(blobMessage.getURL());
+                               } catch (MalformedURLException e) {
+                                       
+                               }
+               activeMessage = msg;
             } else {
                 activeMessage = new ActiveMQMessage();
                 activeMessage.setConnection(connection);

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
 Mon Aug  3 07:42:43 2009
@@ -42,6 +42,7 @@
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.blob.BlobDownloader;
 
 /**
  * <P>
@@ -410,6 +411,7 @@
         configureMessage(message);
         message.setURL(url);
         message.setDeletedByBroker(deletedByBroker);
+        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
         return message;
     }
 
@@ -430,6 +432,7 @@
         ActiveMQBlobMessage message = new ActiveMQBlobMessage();
         configureMessage(message);
         message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), 
file));
+        message.setBlobDownloader(new 
BlobDownloader((getBlobTransferPolicy())));
         message.setDeletedByBroker(true);
         message.setName(file.getName());
         return message;
@@ -452,6 +455,7 @@
         ActiveMQBlobMessage message = new ActiveMQBlobMessage();
         configureMessage(message);
         message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
+        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
         message.setDeletedByBroker(true);
         return message;
     }

Added: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java?rev=800235&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
 (added)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
 Mon Aug  3 07:42:43 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * Represents a strategy of downloading a file/stream from some remote
+ */
+public interface BlobDownloadStrategy {
+    
+    InputStream getInputStream(ActiveMQBlobMessage message) throws 
IOException, JMSException;
+
+}

Propchange: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloadStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java?rev=800235&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
 (added)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
 Mon Aug  3 07:42:43 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+
+/**
+ * Mediator for Blob Download
+ */
+public class BlobDownloader {
+
+    private BlobTransferPolicy blobTransferPolicy;
+    
+    public BlobDownloader(BlobTransferPolicy transferPolicy) {
+        this.blobTransferPolicy = transferPolicy;
+    }
+    
+    public InputStream getInputStream(ActiveMQBlobMessage message) throws 
IOException, JMSException {
+        return getStrategy().getInputStream(message);
+    }
+    
+    public BlobTransferPolicy getBlobTransferPolicy() {
+        return blobTransferPolicy;
+    }
+    
+    public BlobDownloadStrategy getStrategy() {
+        return getBlobTransferPolicy().getDownloadStrategy();
+    }
+}

Propchange: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobDownloader.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/BlobTransferPolicy.java
 Mon Aug  3 07:42:43 2009
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.blob;
 
+import java.net.MalformedURLException;
+import java.net.URL;
+
 /**
  * The policy for configuring how BLOBs (Binary Large OBjects) are transferred
  * out of band between producers, brokers and consumers.
@@ -28,6 +31,7 @@
     private String uploadUrl;
     private int bufferSize = 128 * 1024;
     private BlobUploadStrategy uploadStrategy;
+    private BlobDownloadStrategy downloadStrategy;
 
     /**
      * Returns a copy of this policy object
@@ -90,6 +94,13 @@
         return uploadStrategy;
     }
 
+    public BlobDownloadStrategy getDownloadStrategy() {
+        if(downloadStrategy == null) {
+            downloadStrategy = createDownloadStrategy();
+        }
+        return downloadStrategy;
+    }
+
     /**
      * Sets the upload strategy to use for uploading BLOBs to some URL
      */
@@ -108,7 +119,49 @@
         this.bufferSize = bufferSize;
     }
 
+    /**
+     * Returns the upload strategy depending on the information from the
+     * uploadURL. Currently supportet HTTP and FTP
+     * 
+     * @return
+     */
     protected BlobUploadStrategy createUploadStrategy() {
-        return new DefaultBlobUploadStrategy(this);
+       BlobUploadStrategy strategy;
+       try {
+            URL url = new URL(getUploadUrl());
+
+            if(url.getProtocol().equalsIgnoreCase("FTP")) {
+                strategy = new FTPBlobUploadStrategy(this);
+            } else {
+                strategy = new DefaultBlobUploadStrategy(this);
     }
+        } catch (MalformedURLException e) {
+                strategy = new DefaultBlobUploadStrategy(this);
+}
+        return strategy;
+    }
+    
+    /**
+     * Returns the download strategy depending on the information from the
+     * uploadURL. Currently supportet HTTP and FTP
+     * 
+     * @return
+     */
+    protected BlobDownloadStrategy createDownloadStrategy() {
+        BlobDownloadStrategy strategy;
+        try {
+            URL url = new URL(getUploadUrl());
+            
+            if(url.getProtocol().equalsIgnoreCase("FTP")) {
+                strategy = new FTPBlobDownloadStrategy();
+            } else {
+                strategy = new DefaultBlobDownloadStrategy();
+            }
+        } catch (MalformedURLException e) {
+            strategy = new DefaultBlobDownloadStrategy();
+        }
+        return strategy;
+    }
+
+    
 }

Added: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java?rev=800235&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
 (added)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
 Mon Aug  3 07:42:43 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * A default implementation of {...@link BlobDownloadStrategy} which uses the 
URL
+ * class to download files or streams from a remote URL
+ */
+public class DefaultBlobDownloadStrategy implements BlobDownloadStrategy{
+
+    public InputStream getInputStream(ActiveMQBlobMessage message) throws 
IOException, JMSException {
+        URL value = message.getURL();
+        if (value == null) {
+            return null;
+        }
+        return value.openStream();
+    }
+
+}

Propchange: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/DefaultBlobDownloadStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java?rev=800235&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
 (added)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
 Mon Aug  3 07:42:43 2009
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.commons.net.ftp.FTPClient;
+
+/**
+ * A FTP implementation for {...@link BlobDownloadStrategy}.
+ */
+public class FTPBlobDownloadStrategy implements BlobDownloadStrategy {
+    private String ftpUser;
+    private String ftpPass;
+
+    public InputStream getInputStream(ActiveMQBlobMessage message) throws 
IOException, JMSException {
+        URL url = message.getURL();
+        
+        setUserInformation(url.getUserInfo());
+        String connectUrl = url.getHost();
+        int port = url.getPort() < 1 ? 21 : url.getPort();
+
+        FTPClient ftp = new FTPClient();
+        try {
+               ftp.connect(connectUrl, port);
+        } catch(ConnectException e) {
+               throw new JMSException("Problem connecting the FTP-server");
+        }
+        
+        if(!ftp.login(ftpUser, ftpPass)) {
+               ftp.quit();
+            ftp.disconnect();
+            throw new JMSException("Cant Authentificate to FTP-Server");
+        }
+        String path = url.getPath();
+        String workingDir = path.substring(0, path.lastIndexOf("/"));
+        String file = path.substring(path.lastIndexOf("/")+1);
+        
+        ftp.changeWorkingDirectory(workingDir);
+        ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
+        InputStream input = ftp.retrieveFileStream(file);
+        ftp.quit();
+        ftp.disconnect();
+        
+        return input;
+    }
+    
+    private void setUserInformation(String userInfo) {
+        if(userInfo != null) {
+            String[] userPass = userInfo.split(":");
+            if(userPass.length > 0) this.ftpUser = userPass[0];
+            if(userPass.length > 1) this.ftpPass = userPass[1];
+        } else {
+            this.ftpUser = "anonymous";
+            this.ftpPass = "anonymous";
+        }
+    }
+
+}

Propchange: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java?rev=800235&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
 (added)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
 Mon Aug  3 07:42:43 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.commons.net.ftp.FTPClient;
+
+/**
+ * A FTP implementation of {...@link BlobUploadStrategy}.
+ */
+public class FTPBlobUploadStrategy implements BlobUploadStrategy {
+       
+       private URL url;
+       private String ftpUser = "";
+       private String ftpPass = "";
+       private BlobTransferPolicy transferPolicy;
+       
+       public FTPBlobUploadStrategy(BlobTransferPolicy transferPolicy) throws 
MalformedURLException {
+               this.transferPolicy = transferPolicy;
+               this.url = new URL(this.transferPolicy.getUploadUrl());
+               
+               setUserInformation(url.getUserInfo());
+       }
+
+       public URL uploadFile(ActiveMQBlobMessage message, File file)
+                       throws JMSException, IOException {
+               return uploadStream(message, new FileInputStream(file));
+       }
+
+       public URL uploadStream(ActiveMQBlobMessage message, InputStream in)
+                       throws JMSException, IOException {
+               String connectUrl = url.getHost();
+               int port = url.getPort() < 1 ? 21 : url.getPort();
+               
+               FTPClient ftp = new FTPClient();
+               try {
+               ftp.connect(connectUrl, port);
+        } catch(ConnectException e) {
+               throw new JMSException("Problem connecting the FTP-server");
+        }
+               if(!ftp.login(ftpUser, ftpPass)) {
+                       ftp.quit();
+                       ftp.disconnect();
+                       throw new JMSException("Cant Authentificate to 
FTP-Server");
+               }
+               String path = url.getPath();
+        String workingDir = path.substring(0, path.lastIndexOf("/"));
+               String filename = 
message.getMessageId().toString().replaceAll(":", "_");
+        ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
+        
+        String url;
+        if(!ftp.changeWorkingDirectory(workingDir)) {
+               url = this.url.toString().replaceFirst(this.url.getPath(), 
"")+"/";
+        } else {
+               url = this.url.toString();
+        }
+        
+               ftp.storeFile(filename, in);
+               ftp.quit();
+               ftp.disconnect();
+               
+               return new URL(url + filename);
+       }
+       
+       private void setUserInformation(String userInfo) {
+               if(userInfo != null) {
+                       String[] userPass = userInfo.split(":");
+                       if(userPass.length > 0) this.ftpUser = userPass[0];
+                       if(userPass.length > 1) this.ftpPass = userPass[1];
+               } else {
+                       this.ftpUser = "anonymous";
+                       this.ftpPass = "anonymous";
+               }
+       }
+
+}

Propchange: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobUploadStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java?rev=800235&r1=800234&r2=800235&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBlobMessage.java
 Mon Aug  3 07:42:43 2009
@@ -24,6 +24,7 @@
 import javax.jms.JMSException;
 
 import org.apache.activemq.BlobMessage;
+import org.apache.activemq.blob.BlobDownloader;
 import org.apache.activemq.blob.BlobUploader;
 import org.apache.activemq.util.JMSExceptionSupport;
 
@@ -44,6 +45,7 @@
     private boolean deletedByBroker;
 
     private transient BlobUploader blobUploader;
+    private transient BlobDownloader blobDownloader;
     private transient URL url;
 
     public Message copy() {
@@ -123,11 +125,10 @@
     }
 
     public InputStream getInputStream() throws IOException, JMSException {
-        URL value = getURL();
-        if (value == null) {
+        if(blobDownloader == null) {
             return null;
         }
-        return value.openStream();
+        return blobDownloader.getInputStream(this);
     }
 
     public URL getURL() throws JMSException {
@@ -154,6 +155,14 @@
         this.blobUploader = blobUploader;
     }
 
+    public BlobDownloader getBlobDownloader() {
+        return blobDownloader;
+    }
+
+    public void setBlobDownloader(BlobDownloader blobDownloader) {
+        this.blobDownloader = blobDownloader;
+    }
+
     public void onSend() throws JMSException {
         super.onSend();
 

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java?rev=800235&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
 Mon Aug  3 07:42:43 2009
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.InputStream;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * To start this test make sure an ftp server is running with
+ * user: activemq and password: activemq. 
+ * Also a file called test.txt with the content <b>hello world</b> must be in 
the ftptest directory.
+ */
+public class FTPBlobDownloadStrategyTest extends TestCase {
+       
+       public void xtestDownload() {
+               ActiveMQBlobMessage message = new ActiveMQBlobMessage();
+               BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
+               InputStream stream;
+               try {
+                       message.setURL(new 
URL("ftp://activemq:activ...@localhost/ftptest/test.txt";));
+                       stream = strategy.getInputStream(message);
+                       int i = stream.read();
+                       StringBuilder sb = new StringBuilder(10);
+                       while(i != -1) {
+                               sb.append((char)i);
+                               i = stream.read();
+                       }
+                       Assert.assertEquals("hello world", sb.toString());
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.assertTrue(false);
+               }
+       }
+       
+       public void xtestWrongAuthentification() {
+               ActiveMQBlobMessage message = new ActiveMQBlobMessage();
+               BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
+               try {
+                       message.setURL(new 
URL("ftp://activemq:activemq_wr...@localhost/ftptest/test.txt";));
+                       strategy.getInputStream(message);
+               } catch(JMSException e) {
+                       Assert.assertEquals("Wrong Exception", "Cant 
Authentificate to FTP-Server", e.getMessage());
+                       return;
+               } catch(Exception e) {
+                       System.out.println(e);
+                       Assert.assertTrue("Wrong Exception "+ e, false);
+                       return;
+               }
+               
+               Assert.assertTrue("Expect Exception", false);
+       }
+       
+       public void xtestWrongFTPPort() {
+               ActiveMQBlobMessage message = new ActiveMQBlobMessage();
+               BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy();
+               try {
+                       message.setURL(new 
URL("ftp://activemq:activ...@localhost:442/ftptest/test.txt";));
+                       strategy.getInputStream(message);
+               } catch(JMSException e) {
+                       Assert.assertEquals("Wrong Exception", "Problem 
connecting the FTP-server", e.getMessage());
+                       return;
+               } catch(Exception e) {
+                       e.printStackTrace();
+                       Assert.assertTrue("Wrong Exception "+ e, false);
+                       return;
+               }
+               
+               Assert.assertTrue("Expect Exception", false);
+       }
+
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java?rev=800235&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
 Mon Aug  3 07:42:43 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.InputStream;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.BlobMessage;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+
+/**
+ * To start this test make sure an ftp server is running with
+ * user: activemq and password: activemq
+ */
+public class FTPBlobTest extends EmbeddedBrokerTestSupport {
+       
+       private ActiveMQConnection connection;
+
+       protected void setUp() throws Exception {
+               bindAddress = 
"vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=ftp://activemq:activ...@localhost/ftptest/";;
+        super.setUp();
+
+        connection = (ActiveMQConnection) createConnection();
+        connection.start();
+       }
+       
+       public void testBlobFile() throws Exception {
+               // first create Message
+               File file = File.createTempFile("amq-data-file-", ".dat");
+        // lets write some data
+               String content = "hello world "+ System.currentTimeMillis();
+        BufferedWriter writer = new BufferedWriter(new FileWriter(file));
+        writer.append(content);
+        writer.close();
+        
+        ActiveMQSession session = (ActiveMQSession) 
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+        BlobMessage message = session.createBlobMessage(file);
+        
+        producer.send(message);
+        Thread.sleep(1000);
+
+        // check message send
+        Message msg = consumer.receive(1000);
+        Assert.assertTrue(msg instanceof ActiveMQBlobMessage);
+
+        InputStream input = ((ActiveMQBlobMessage) msg).getInputStream();
+        StringBuilder b = new StringBuilder();
+        int i = input.read();
+        while(i != -1) {
+               b.append((char) i);
+               i = input.read();
+        }
+        Assert.assertEquals(content, b.toString());
+       }
+
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java?rev=800235&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
 Mon Aug  3 07:42:43 2009
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.net.URL;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.commons.net.ftp.FTPClient;
+
+/**
+ * To start this test make sure an ftp server is running with
+ * user: activemq and password: activemq
+ */
+public class FTPBlobUploadStrategyTest extends EmbeddedBrokerTestSupport {
+       
+       private Connection connection;
+
+       protected void setUp() throws Exception {
+               bindAddress = 
"vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=ftp://activemq:activ...@localhost/ftptest/";;
+        super.setUp();
+
+        connection = createConnection();
+        connection.start();
+        
+        // check if file exist and delete it
+        URL url = new URL("ftp://activemq:activ...@localhost/ftptest/";);
+        String connectUrl = url.getHost();
+               int port = url.getPort() < 1 ? 21 : url.getPort();
+               
+               FTPClient ftp = new FTPClient();
+               ftp.connect(connectUrl, port);
+               if(!ftp.login("activemq", "activemq")) {
+                       ftp.quit();
+                       ftp.disconnect();
+                       throw new JMSException("Cant Authentificate to 
FTP-Server");
+               }
+               ftp.changeWorkingDirectory("ftptest");
+               ftp.deleteFile("testmessage");
+               ftp.quit();
+               ftp.disconnect();
+    }
+       
+       public void testFileUpload() throws Exception {
+               File file = File.createTempFile("amq-data-file-", ".dat");
+        // lets write some data
+        BufferedWriter writer = new BufferedWriter(new FileWriter(file));
+        writer.append("hello world");
+        writer.close();
+        
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        ((ActiveMQConnection)connection).setCopyMessageOnSend(false);
+        
+        ActiveMQBlobMessage message = (ActiveMQBlobMessage) 
((ActiveMQSession)session).createBlobMessage(file);
+        message.setMessageId(new MessageId("testmessage"));
+        message.onSend();
+        
Assert.assertEquals("ftp://activemq:activ...@localhost/ftptest/testmessage";, 
message.getURL().toString()); 
+       }
+
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain


Reply via email to