This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit da0b21fb846de119897f6086737be5de34f1cbac Author: laohu <[email protected]> AuthorDate: Thu Jun 6 14:36:18 2019 +0800 change --- README-CN.md | 16 +++++++++ README.md | 16 ++++++++- .../apache/rocketmq/connect/activemq/Config.java | 38 ++++++++++++++++++++-- .../rocketmq/connect/activemq/Replicator.java | 3 -- .../connect/activemq/connector/ActivemqTask.java | 10 ++++-- .../connect/activemq/pattern/PatternProcessor.java | 16 ++++----- 6 files changed, 81 insertions(+), 18 deletions(-) diff --git a/README-CN.md b/README-CN.md new file mode 100644 index 0000000..bd745e2 --- /dev/null +++ b/README-CN.md @@ -0,0 +1,16 @@ +##### ActiveConnector完全限定名 +org.apache.rocketmq.connect.activemq.connector.ActivemqConnector + + +##### 配置参数 + +参数 | 作用 | 是否必填 | 默认值 +---|--- |--- +activemq.url | activemq ip与端口号 | 是 | 无 +activemq.username | 用户名 | 否 | 无 +activemq.password| 密码 | 否 | 无 +jms.destination.name | 读取的队列或者主题名 | 是 | 无 +jms.destination.type | 读取的类型:queue(队列)或者topic(主题) | 是 | 无 +jms.message.selector | 过滤器 | 否 |无 +jms.session.acknowledge.mode | 消息确认 | 否 | Session.AUTO_ACKNOWLEDGE +jms.session.transacted | 是否是事务会话 | 否 | false diff --git a/README.md b/README.md index 8b13789..e965865 100644 --- a/README.md +++ b/README.md @@ -1 +1,15 @@ - +参数 | 作用 | +---|--- |--- +activemq.url | activemq ip与端口号 +activemq.username | 用户名 +activemq.password| 密码 +jms.destination.name | 读取的队列或者主题名 +jms.destination.type | 读取的类型:queue(队列)或者topic(主题) +jms.message.selector | 过滤器 +jms.session.acknowledge.mode | 消息确认 +jms.session.transacted | 是否是事务会话 +rocketmq.topic | 发送的topic +rocketmq.name | broker的用户名 +rocketmq.sk | +rocketmq.ak | +rocketmq.nameserver | nameserver url diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Config.java b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java index 15a95a7..af218c0 100644 --- a/src/main/java/org/apache/rocketmq/connect/activemq/Config.java +++ b/src/main/java/org/apache/rocketmq/connect/activemq/Config.java @@ -21,15 +21,16 @@ import java.lang.reflect.Method; import java.util.HashSet; import java.util.Set; +import javax.jms.Session; + import io.openmessaging.KeyValue; public class Config { + @SuppressWarnings("serial") public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { { add("activemqUrl"); - add("activemqUsername"); - add("activemqPassword"); add("destinationType"); add("destinationName"); } @@ -44,7 +45,13 @@ public class Config { public String destinationType; public String destinationName; + + public String messageSelector; + + private Integer sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; + private Boolean sessionTransacted = Boolean.FALSE; + public void load(KeyValue props) { properties2Object(props, this); @@ -130,4 +137,31 @@ public class Config { public void setDestinationName(String destinationName) { this.destinationName = destinationName; } + + public String getMessageSelector() { + return messageSelector; + } + + public void setMessageSelector(String messageSelector) { + this.messageSelector = messageSelector; + } + + public Integer getSessionAcknowledgeMode() { + return sessionAcknowledgeMode; + } + + public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) { + this.sessionAcknowledgeMode = sessionAcknowledgeMode; + } + + public Boolean getSessionTransacted() { + return sessionTransacted; + } + + public void setSessionTransacted(Boolean sessionTransacted) { + this.sessionTransacted = sessionTransacted; + } + + + } \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java index 499beb0..dd83c37 100644 --- a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java +++ b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java @@ -30,12 +30,9 @@ public class Replicator { private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class); - private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger"); - private PatternProcessor processor; private Config config; - private Object lock = new Object(); private BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); public Replicator(Config config){ diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java index f871be5..9743a19 100644 --- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java +++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqTask.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.connect.activemq.connector; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.DataEntryBuilder; import io.openmessaging.connector.api.data.SourceDataEntry; import io.openmessaging.connector.api.source.SourceTask; @@ -50,7 +52,11 @@ public class ActivemqTask extends SourceTask { try { Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS); - SourceDataEntry sourceDataEntry = null; + DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(null); + dataEntryBuilder.timestamp(System.currentTimeMillis()); + SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry( + ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")), + ByteBuffer.wrap(JSON.toJSONBytes(message))); res.add(sourceDataEntry); } catch (Exception e) { @@ -66,10 +72,10 @@ public class ActivemqTask extends SourceTask { this.config = new Config(); this.config.load(props); this.replicator = new Replicator(config); - this.replicator.start(); } catch (Exception e) { log.error("Mysql task start failed.", e); } + this.replicator.start(); } @Override diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java index c1b282c..b26bfb9 100644 --- a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java +++ b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java @@ -32,31 +32,27 @@ public class PatternProcessor { } public void start() { + if(!StringUtils.equals("topic", config.getDestinationType())&&!StringUtils.equals("queue", config.getDestinationType())) { + throw new RuntimeException("destination type is incorrectness"); + } + try { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl()); - //2、使用连接工厂创建一个连接对象 if(StringUtils.isNotBlank(config.getActivemqUsername()) && StringUtils.isNotBlank(config.getActivemqPassword()) ) { connection = connectionFactory.createConnection(config.getActivemqUsername() , config.getActivemqPassword()); }else { connection = connectionFactory.createConnection(); } - //3、开启连接 connection.start(); - //4、使用连接对象创建会话(session)对象 - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) + Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode()); Destination destination = null; if(StringUtils.equals("topic", config.getDestinationType())) { destination = session.createTopic(config.getDestinationName()); }else if(StringUtils.equals("queue", config.getDestinationType())){ destination = session.createQueue(config.getDestinationName()); - }else { - throw new RuntimeException(""); } - consumer = session.createConsumer(destination); - //6、使用会话对象创建生产者对象 - //7、向consumer对象中设置一个messageListener对象,用来接收消息 + consumer = session.createConsumer(destination, config.getMessageSelector()); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) {
