Hi,everyone

    I just use ActiveMQ stream to transfer large file. When I start this
action in one or two thread,It works OK.But if I start more than 3 thread to
transfer data currently,after some data transfered,then the senders and
receiver threads all get waiting state. Please give some help or advices
about this. Thanks a lot.

   the following is my sender and receiver code. 
sender:
/**
 * ActivemqFileSender.java
 * Copyright(C) 2006 Agree Tech, All rights reserved.
 * Created on 2006-7-17 ??11:17:16 by wang
 */
package cn.com.agree.eai.file;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.HashMap;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQOutputStream;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import cn.com.agree.tools.StringTool;

public class ActivemqFileSender implements Runnable {
        /** ?????? */
        private final static Log logger = LogFactory
                        .getLog(ActivemqFileSender.class);

        public static String DEFAULT_MQ_URL = "tcp://127.0.0.1:61636";

        public static String SUBJECT_NAME = "FILE.TRANS";
        
        /** ???????????????? */
        public long averageSize;

        /** ??ID????id???????????? */
        public int id;
        /** ????????????? */
        public long blockSize;
        
        /**???????????? */
        public int bufferSize;

        /** ????????? */
        public File file;

        // ???activeMQ??????
        /** Active MQ ???? */
        public String url = DEFAULT_MQ_URL;

        /** ??????queue???????topic?? */
        public String name;

        /**
         * true : topic false : queue
         */
        public boolean isTopic = false;

        public String subject ;

        private Connection connection;

        private ActiveMQOutputStream outputStream;      

        /**
         * ???????????id         
         * @param file ????
         * @param id ??Id
         * @param averageSize ??????????????
         * @param blockSize ?????????????????averageSize
         * @param bufferSize ?????
         * @throws JMSException
         */
        public ActivemqFileSender(File file, int id,long averageSize, long
blockSize,int bufferSize) {
                // TODO Auto-generated constructor stub
                this.file = file;
                this.id = id;
                this.averageSize=averageSize;
                this.blockSize = blockSize;
                this.bufferSize=bufferSize;
                
                this.subject= SUBJECT_NAME + id;
                try {
                        init();
                } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        if (logger.isDebugEnabled()) {
                                logger.debug("?????? @ ???ActiveMQ????", e);
                        }
                        e.printStackTrace();
                }
        }
        
        /**
         * ???ActiveMQ????
         * 
         * @throws JMSException
         */
        public void init() throws JMSException {
                // create connection
                ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(
                                ActiveMQConnection.DEFAULT_USER,
                                ActiveMQConnection.DEFAULT_PASSWORD, url);
                connection = connectionFactory.createConnection();
//              connection.setClientID("FILE");
                connection.start();

                byte defaultType = isTopic ? ActiveMQDestination.TOPIC_TYPE
                                : ActiveMQDestination.QUEUE_TYPE;
                ActiveMQDestination destination = ActiveMQDestination
                                .createDestination(subject, defaultType);
                
                Map streamProperties = new HashMap();
        streamProperties.put("fileName", "test_file_trans");
        
                this.outputStream = (ActiveMQOutputStream) ((ActiveMQConnection)
connection)
                                .createOutputStream(destination, null,
                                                DeliveryMode.NON_PERSISTENT,
                                                
ActiveMQMessage.DEFAULT_PRIORITY,
                                                
ActiveMQMessage.DEFAULT_TIME_TO_LIVE);

                if (logger.isDebugEnabled()) {
                        logger.debug("?????? @ ???ActiveMQ?????subject=" + 
subject
                                        + "defaultType=" + defaultType);
                }

        }

        /*
         * (non-Javadoc)
         * 
         * @see java.lang.Runnable#run()
         */
        public void run() {
                if(logger.isDebugEnabled())
                {
                        logger.debug("??id= "+this.id+" ??????????");
                }
                //???????????64K
                if(this.bufferSize<=0 )
                {
                        this.bufferSize=64*1024;
                }
                if(this.bufferSize>this.averageSize)
                {
                        this.bufferSize=(int)this.averageSize;
                }
                //?????
                RandomAccessFile raf=null;
                try {
                        raf=new RandomAccessFile(file,"r");
                } catch (FileNotFoundException e) {
                        // TODO Auto-generated catch block
                        if(logger.isErrorEnabled()){
                                logger.error("?????? @ 
???????????"+this.file.getName());
                        }
                        e.printStackTrace();
                        return;
                }
                
                byte[] buffer=new byte[bufferSize];
                int len;
                int totalLen=0;
                long pos=id*averageSize;
                try {
                        raf.seek(pos);
                } catch (IOException e1) {
                        // TODO Auto-generated catch block
                        e1.printStackTrace();
                }
                try {
                        while((len=raf.read(buffer))!=-1 && totalLen<blockSize)
                        {
                                if(totalLen+len<=blockSize){
                                        outputStream.write(buffer,0,len);
                                }else{
                                        
outputStream.write(buffer,0,(int)(blockSize-totalLen));
                                }
//                              outputStream.flush();
                                byte[] b= new byte[len];
                                System.arraycopy(buffer, 0, b, 0, len);
//                              System.out.println(StringTool.toHexTable(b));
                                totalLen+=len;
                                System.out.println("Thread id="+id+",sent bytes 
"+len+"
totalLen="+totalLen+" blockSize="+blockSize);
                                System.out.println("file pointer 
position="+raf.getFilePointer());
                                try {
                                        Thread.sleep(10);
                                } catch (InterruptedException e) {
                                        // TODO Auto-generated catch block
                                        e.printStackTrace();
                                }
                        }
                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        if(logger.isErrorEnabled()){
                                logger.error("?????? @ ????????");
                        }
                        e.printStackTrace();
                }
                
                try {
                        raf.close();
                        outputStream.close();
                        try {
                                connection.close();
                        } catch (JMSException e) {
                                // TODO Auto-generated catch block
                                logger.error("?????? @???id?"+this.id+" 
????????");
                                e.printStackTrace();
                        }
                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        logger.error("?????? @??????"+this.id+" ????????");
                        e.printStackTrace();
                }
                

        }
        
        public static void main(String[] args){
                File file=new File("E:\\??????2004.RAR");
                long fileSize = file.length();
                
                int count=2;
                long averageSize=fileSize/count;
                for(int i=0;i<count;i++)
                {
                        ActivemqFileSender afs=null;
                        if(i<count-1){
                                afs=new 
ActivemqFileSender(file,i,averageSize,averageSize,64*1024);
                        }else{
                                afs=new
ActivemqFileSender(file,i,averageSize,fileSize-(count-1)*averageSize,64*1024);
                        }
                        
                        new Thread(afs).start();
//                      try {
////                            Thread.sleep(1000);
//                      } catch (InterruptedException e) {
//                              // TODO Auto-generated catch block
//                              e.printStackTrace();
//                      }
                }                       
        }

}

receiver code:

/**
 * ActivemqFileReceiver.java
 * Copyright(C) 2006 Agree Tech, All rights reserved.
 * Created on 2006-7-18 ??10:18:13 by wang
 */
package cn.com.agree.eai.file;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQInputStream;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ActivemqFileReceiver implements Runnable {

        private static final Log logger = LogFactory
                        .getLog(ActivemqFileReceiver.class);

        /** ???????????????? */
        public long averageSize;

        /** ??ID????id???????????? */
        public int id;

        /** ????????????? */
        public long blockSize;

        /** ???????????? */
        public int bufferSize;

        /** ????????? */
        public File file;

        // ???activeMQ??????
        /** Active MQ ???? */
        public String url = ActivemqFileSender.DEFAULT_MQ_URL;

        /** ??????queue???????topic?? */
        public String name;

        /**
         * true : topic false : queue
         */
        public boolean isTopic = false;

        public String subject;

        private Connection connection;

        private ActiveMQInputStream inputStream;

        /**
         * ???????????ID
         * 
         * @param file
         *            ????
         * @param id
         *            ??Id
         * @param averageSize
         *            ??????????????
         * @param blockSize
         *            ?????????????????averageSize
         * @param bufferSize
         *            ?????
         * @throws JMSException
         */
        public ActivemqFileReceiver(File file, int id, long averageSize,
                        long blockSize, int bufferSize) {
                // TODO Auto-generated constructor stub
                this.file = file;
                this.id = id;
                this.averageSize = averageSize;
                this.blockSize = blockSize;
                this.bufferSize = bufferSize;

                this.subject = ActivemqFileSender.SUBJECT_NAME + id;
                try {
                        init();
                } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
        }

        public void init() throws JMSException {
                ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(
                                ActiveMQConnection.DEFAULT_USER,
                                ActiveMQConnection.DEFAULT_PASSWORD, url);

                connection = connectionFactory.createConnection();
//              connection.setClientID("FILE");
                connection.start();
                
//              Session session=connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
                

                byte defaultType = isTopic ? ActiveMQDestination.TOPIC_TYPE
                                : ActiveMQDestination.QUEUE_TYPE;
                ActiveMQDestination destination = ActiveMQDestination
                                .createDestination(subject, defaultType);
                
//              session.createConsumer(destination);

                inputStream = (ActiveMQInputStream) ((ActiveMQConnection) 
connection)
                                .createInputStream(destination, null);

                if (logger.isDebugEnabled()) {
                        logger.debug("?????? @ ???ActiveMQ?????subject=" + 
subject
                                        + "defaultType=" + defaultType);
                }

        }

        /*
         * (non-Javadoc)
         * 
         * @see java.lang.Runnable#run()
         */
        public void run() {
                if (logger.isDebugEnabled()) {
                        logger.debug("??id= " + this.id + " ??????????");
                }
                // ???????????16K
                if (this.bufferSize <= 0) {
                        this.bufferSize = 64 * 1024;
                }
                if (this.bufferSize > this.averageSize) {
                        this.bufferSize = (int) this.averageSize;
                }
                // ?????
                RandomAccessFile raf = null;
                try {
                        raf = new RandomAccessFile(file, "rwd");
                } catch (FileNotFoundException e) {
                        // TODO Auto-generated catch block
                        if (logger.isErrorEnabled()) {
                                logger.error("?????? @ ???????????" + 
this.file.getName());
                        }
                        e.printStackTrace();
                        try {
                                raf.close();
                        } catch (IOException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                        }
                        try {
                                connection.close();
                        } catch (JMSException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                        }
                        try {
                                inputStream.close();
                        } catch (IOException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                        }
                        return;
                }
                byte[] buffer = new byte[bufferSize];
                int len, totalLen = 0;
                long pos = id * averageSize;
                try {
                        raf.seek(pos);
                } catch (IOException e1) {
                        // TODO Auto-generated catch block
                        e1.printStackTrace();
                }
                try {
                        while ((len = inputStream.read(buffer, 0, bufferSize)) 
!= -1
                                        && totalLen < blockSize) {
                                byte[] b = new byte[len];
                                System.arraycopy(buffer, 0, b, 0, len);
                                // System.out.println(new String(b));
                                raf.write(b);
                                totalLen += len;

//                              System.out.println(StringTool.toHexTable(b));
                                System.out.println("Thread id="+id+",received 
bytes "+len+"
totalLen="+totalLen+" blockSize="+blockSize);
                        }
                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        if (logger.isErrorEnabled()) {
                                logger.error("?????? @ ????????");
                        }
                        e.printStackTrace();
                }
                try {
                        raf.close();
                        inputStream.close();
                        try {
                                connection.close();
                        } catch (JMSException e) {
                                // TODO Auto-generated catch block
                                logger.error("?????? @???id?" + this.id + " 
????????");
                                e.printStackTrace();
                        }
                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        logger.error("?????? @??????" + this.id + " ????????");
                        e.printStackTrace();
                }

        }

        public static void main(String[] args) throws IOException {
                File file = new File("d:\\test.rar");
                File f = new File(
                                "E:\\??????2004.RAR");
                RandomAccessFile raf = new RandomAccessFile(file, "rwd");
                raf.setLength(f.length());
                int count=2;
                long fileSize = f.length();
                long averageSize = fileSize / count;

                int buffSize = 64 * 1024;
                for (int i = 0; i < count; i++) {
                        ActivemqFileReceiver afr = null;
                        if (i < count-1) {
                                afr = new ActivemqFileReceiver(file, i, 
averageSize,
                                                averageSize, buffSize);
                        } else {
                                afr = new ActivemqFileReceiver(file, i, 
averageSize, fileSize
                                                - (count-1) * averageSize, 
buffSize);
                        }

                        new Thread(afr).start();
                        // try {
                        // // Thread.sleep(1000);
                        // } catch (InterruptedException e) {
                        // // TODO Auto-generated catch block
                        // e.printStackTrace();
                        // }
                }
                raf.close();
        }

}



-- 
View this message in context: 
http://www.nabble.com/Question-about-ActiveInputStream-and-ActiveMQOutputStream-tf1978079.html#a5427288
Sent from the ActiveMQ - User forum at Nabble.com.

Reply via email to