Hi,everyone
I just use ActiveMQ stream to transfer large file. When I start this
action in one or two thread,It works OK.But if I start more than 3 thread to
transfer data currently,after some data transfered,then the senders and
receiver threads all get waiting state. Please give some help or advices
about this. Thanks a lot.
the following is my sender and receiver code.
sender:
/**
* ActivemqFileSender.java
* Copyright(C) 2006 Agree Tech, All rights reserved.
* Created on 2006-7-17 ??11:17:16 by wang
*/
package cn.com.agree.eai.file;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQOutputStream;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import cn.com.agree.tools.StringTool;
public class ActivemqFileSender implements Runnable {
/** ?????? */
private final static Log logger = LogFactory
.getLog(ActivemqFileSender.class);
public static String DEFAULT_MQ_URL = "tcp://127.0.0.1:61636";
public static String SUBJECT_NAME = "FILE.TRANS";
/** ???????????????? */
public long averageSize;
/** ??ID????id???????????? */
public int id;
/** ????????????? */
public long blockSize;
/**???????????? */
public int bufferSize;
/** ????????? */
public File file;
// ???activeMQ??????
/** Active MQ ???? */
public String url = DEFAULT_MQ_URL;
/** ??????queue???????topic?? */
public String name;
/**
* true : topic false : queue
*/
public boolean isTopic = false;
public String subject ;
private Connection connection;
private ActiveMQOutputStream outputStream;
/**
* ???????????id
* @param file ????
* @param id ??Id
* @param averageSize ??????????????
* @param blockSize ?????????????????averageSize
* @param bufferSize ?????
* @throws JMSException
*/
public ActivemqFileSender(File file, int id,long averageSize, long
blockSize,int bufferSize) {
// TODO Auto-generated constructor stub
this.file = file;
this.id = id;
this.averageSize=averageSize;
this.blockSize = blockSize;
this.bufferSize=bufferSize;
this.subject= SUBJECT_NAME + id;
try {
init();
} catch (JMSException e) {
// TODO Auto-generated catch block
if (logger.isDebugEnabled()) {
logger.debug("?????? @ ???ActiveMQ????", e);
}
e.printStackTrace();
}
}
/**
* ???ActiveMQ????
*
* @throws JMSException
*/
public void init() throws JMSException {
// create connection
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, url);
connection = connectionFactory.createConnection();
// connection.setClientID("FILE");
connection.start();
byte defaultType = isTopic ? ActiveMQDestination.TOPIC_TYPE
: ActiveMQDestination.QUEUE_TYPE;
ActiveMQDestination destination = ActiveMQDestination
.createDestination(subject, defaultType);
Map streamProperties = new HashMap();
streamProperties.put("fileName", "test_file_trans");
this.outputStream = (ActiveMQOutputStream) ((ActiveMQConnection)
connection)
.createOutputStream(destination, null,
DeliveryMode.NON_PERSISTENT,
ActiveMQMessage.DEFAULT_PRIORITY,
ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
if (logger.isDebugEnabled()) {
logger.debug("?????? @ ???ActiveMQ?????subject=" +
subject
+ "defaultType=" + defaultType);
}
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
public void run() {
if(logger.isDebugEnabled())
{
logger.debug("??id= "+this.id+" ??????????");
}
//???????????64K
if(this.bufferSize<=0 )
{
this.bufferSize=64*1024;
}
if(this.bufferSize>this.averageSize)
{
this.bufferSize=(int)this.averageSize;
}
//?????
RandomAccessFile raf=null;
try {
raf=new RandomAccessFile(file,"r");
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
if(logger.isErrorEnabled()){
logger.error("?????? @
???????????"+this.file.getName());
}
e.printStackTrace();
return;
}
byte[] buffer=new byte[bufferSize];
int len;
int totalLen=0;
long pos=id*averageSize;
try {
raf.seek(pos);
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
while((len=raf.read(buffer))!=-1 && totalLen<blockSize)
{
if(totalLen+len<=blockSize){
outputStream.write(buffer,0,len);
}else{
outputStream.write(buffer,0,(int)(blockSize-totalLen));
}
// outputStream.flush();
byte[] b= new byte[len];
System.arraycopy(buffer, 0, b, 0, len);
// System.out.println(StringTool.toHexTable(b));
totalLen+=len;
System.out.println("Thread id="+id+",sent bytes
"+len+"
totalLen="+totalLen+" blockSize="+blockSize);
System.out.println("file pointer
position="+raf.getFilePointer());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
if(logger.isErrorEnabled()){
logger.error("?????? @ ????????");
}
e.printStackTrace();
}
try {
raf.close();
outputStream.close();
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
logger.error("?????? @???id?"+this.id+"
????????");
e.printStackTrace();
}
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error("?????? @??????"+this.id+" ????????");
e.printStackTrace();
}
}
public static void main(String[] args){
File file=new File("E:\\??????2004.RAR");
long fileSize = file.length();
int count=2;
long averageSize=fileSize/count;
for(int i=0;i<count;i++)
{
ActivemqFileSender afs=null;
if(i<count-1){
afs=new
ActivemqFileSender(file,i,averageSize,averageSize,64*1024);
}else{
afs=new
ActivemqFileSender(file,i,averageSize,fileSize-(count-1)*averageSize,64*1024);
}
new Thread(afs).start();
// try {
//// Thread.sleep(1000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
}
}
}
receiver code:
/**
* ActivemqFileReceiver.java
* Copyright(C) 2006 Agree Tech, All rights reserved.
* Created on 2006-7-18 ??10:18:13 by wang
*/
package cn.com.agree.eai.file;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQInputStream;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class ActivemqFileReceiver implements Runnable {
private static final Log logger = LogFactory
.getLog(ActivemqFileReceiver.class);
/** ???????????????? */
public long averageSize;
/** ??ID????id???????????? */
public int id;
/** ????????????? */
public long blockSize;
/** ???????????? */
public int bufferSize;
/** ????????? */
public File file;
// ???activeMQ??????
/** Active MQ ???? */
public String url = ActivemqFileSender.DEFAULT_MQ_URL;
/** ??????queue???????topic?? */
public String name;
/**
* true : topic false : queue
*/
public boolean isTopic = false;
public String subject;
private Connection connection;
private ActiveMQInputStream inputStream;
/**
* ???????????ID
*
* @param file
* ????
* @param id
* ??Id
* @param averageSize
* ??????????????
* @param blockSize
* ?????????????????averageSize
* @param bufferSize
* ?????
* @throws JMSException
*/
public ActivemqFileReceiver(File file, int id, long averageSize,
long blockSize, int bufferSize) {
// TODO Auto-generated constructor stub
this.file = file;
this.id = id;
this.averageSize = averageSize;
this.blockSize = blockSize;
this.bufferSize = bufferSize;
this.subject = ActivemqFileSender.SUBJECT_NAME + id;
try {
init();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void init() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, url);
connection = connectionFactory.createConnection();
// connection.setClientID("FILE");
connection.start();
// Session session=connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
byte defaultType = isTopic ? ActiveMQDestination.TOPIC_TYPE
: ActiveMQDestination.QUEUE_TYPE;
ActiveMQDestination destination = ActiveMQDestination
.createDestination(subject, defaultType);
// session.createConsumer(destination);
inputStream = (ActiveMQInputStream) ((ActiveMQConnection)
connection)
.createInputStream(destination, null);
if (logger.isDebugEnabled()) {
logger.debug("?????? @ ???ActiveMQ?????subject=" +
subject
+ "defaultType=" + defaultType);
}
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
public void run() {
if (logger.isDebugEnabled()) {
logger.debug("??id= " + this.id + " ??????????");
}
// ???????????16K
if (this.bufferSize <= 0) {
this.bufferSize = 64 * 1024;
}
if (this.bufferSize > this.averageSize) {
this.bufferSize = (int) this.averageSize;
}
// ?????
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(file, "rwd");
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
if (logger.isErrorEnabled()) {
logger.error("?????? @ ???????????" +
this.file.getName());
}
e.printStackTrace();
try {
raf.close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
connection.close();
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
inputStream.close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
return;
}
byte[] buffer = new byte[bufferSize];
int len, totalLen = 0;
long pos = id * averageSize;
try {
raf.seek(pos);
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
while ((len = inputStream.read(buffer, 0, bufferSize))
!= -1
&& totalLen < blockSize) {
byte[] b = new byte[len];
System.arraycopy(buffer, 0, b, 0, len);
// System.out.println(new String(b));
raf.write(b);
totalLen += len;
// System.out.println(StringTool.toHexTable(b));
System.out.println("Thread id="+id+",received
bytes "+len+"
totalLen="+totalLen+" blockSize="+blockSize);
}
} catch (IOException e) {
// TODO Auto-generated catch block
if (logger.isErrorEnabled()) {
logger.error("?????? @ ????????");
}
e.printStackTrace();
}
try {
raf.close();
inputStream.close();
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
logger.error("?????? @???id?" + this.id + "
????????");
e.printStackTrace();
}
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error("?????? @??????" + this.id + " ????????");
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
File file = new File("d:\\test.rar");
File f = new File(
"E:\\??????2004.RAR");
RandomAccessFile raf = new RandomAccessFile(file, "rwd");
raf.setLength(f.length());
int count=2;
long fileSize = f.length();
long averageSize = fileSize / count;
int buffSize = 64 * 1024;
for (int i = 0; i < count; i++) {
ActivemqFileReceiver afr = null;
if (i < count-1) {
afr = new ActivemqFileReceiver(file, i,
averageSize,
averageSize, buffSize);
} else {
afr = new ActivemqFileReceiver(file, i,
averageSize, fileSize
- (count-1) * averageSize,
buffSize);
}
new Thread(afr).start();
// try {
// // Thread.sleep(1000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
}
raf.close();
}
}
--
View this message in context:
http://www.nabble.com/Question-about-ActiveInputStream-and-ActiveMQOutputStream-tf1978079.html#a5427288
Sent from the ActiveMQ - User forum at Nabble.com.