This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit db391c5c580d1d251aff442d994c9932af1a35c6 Author: laohu <[email protected]> AuthorDate: Thu Jun 13 10:12:29 2019 +0800 change exception --- pom.xml | 5 -- .../rocketmq/connect/activemq/ErrorCode.java | 8 +++ .../rocketmq/connect/activemq/Replicator.java | 18 +++--- .../activemq/connector/ActivemqSourceTask.java | 21 +++++-- .../connect/activemq/pattern/PatternProcessor.java | 66 ++++++++++------------ .../rocketmq/connect/activemq/ReplicatorTest.java | 4 +- 6 files changed, 64 insertions(+), 58 deletions(-) diff --git a/pom.xml b/pom.xml index 6e933bd..473a663 100644 --- a/pom.xml +++ b/pom.xml @@ -156,11 +156,6 @@ <version>0.1.0-beta</version> </dependency> <dependency> - <groupId>io.openmessaging</groupId> - <artifactId>openmessaging-api</artifactId> - <version>0.3.1-alpha</version> - </dependency> - <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.51</version> diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java b/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java new file mode 100644 index 0000000..de3b3f5 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/activemq/ErrorCode.java @@ -0,0 +1,8 @@ +package org.apache.rocketmq.connect.activemq; + +public class ErrorCode { + + public static final int START_ERROR_CODE = 10001; + + public static final int STOP_ERROR_CODE = 10002; +} diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java index 2b18481..e0ebe12 100644 --- a/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java +++ b/src/main/java/org/apache/rocketmq/connect/activemq/Replicator.java @@ -19,7 +19,9 @@ package org.apache.rocketmq.connect.activemq; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; + import javax.jms.Message; + import org.apache.rocketmq.connect.activemq.pattern.PatternProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,19 +39,13 @@ public class Replicator { this.config = config; } - public void start() { - - try { - processor = new PatternProcessor(this); - processor.start(); - - } catch (Exception e) { - LOGGER.error("Start error.", e); - throw new RuntimeException(e); - } + public void start() throws Exception { + processor = new PatternProcessor(this); + processor.start(); + LOGGER.info("Replicator start succeed"); } - public void stop() { + public void stop() throws Exception { processor.stop(); } diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java index 2140b5f..9d1a1aa 100644 --- a/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java @@ -19,7 +19,9 @@ package org.apache.rocketmq.connect.activemq.connector; import com.alibaba.fastjson.JSON; import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.EntryType; import io.openmessaging.connector.api.data.SourceDataEntry; +import io.openmessaging.connector.api.exception.DataConnectException; import io.openmessaging.connector.api.source.SourceTask; import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; @@ -38,6 +40,7 @@ import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; import org.apache.rocketmq.connect.activemq.Config; +import org.apache.rocketmq.connect.activemq.ErrorCode; import org.apache.rocketmq.connect.activemq.Replicator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +61,8 @@ public class ActivemqSourceTask extends SourceTask { try { Message message = replicator.getQueue().poll(1000, TimeUnit.MILLISECONDS); if (message != null) { - SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), null, config.getDestinationName(), null, null); + Object[] payload = new Object[] {config.getDestinationType(), config.getDestinationName(), getMessageConnent(message)}; + SourceDataEntry sourceDataEntry = new SourceDataEntry(sourcePartition, getMessageConnent(message), System.currentTimeMillis(), EntryType.CREATE, null, null, payload); res.add(sourceDataEntry); } } catch (Exception e) { @@ -74,15 +78,21 @@ public class ActivemqSourceTask extends SourceTask { this.config.load(props); this.sourcePartition = ByteBuffer.wrap(config.getActivemqUrl().getBytes("UTF-8")); this.replicator = new Replicator(config); + this.replicator.start(); } catch (Exception e) { - log.error("Mysql task start failed.", e); + log.error("activemq task start failed.", e); + throw new DataConnectException(ErrorCode.START_ERROR_CODE, e.getMessage(), e); } - this.replicator.start(); } @Override public void stop() { - replicator.stop(); + try { + replicator.stop(); + } catch (Exception e) { + log.error("activemq task stop failed.", e); + throw new DataConnectException(ErrorCode.STOP_ERROR_CODE, e.getMessage(), e); + } } @Override public void pause() { @@ -123,8 +133,9 @@ public class ActivemqSourceTask extends SourceTask { } data = bis.toByteArray(); } else { + // The exception is printed and does not need to be written as a DataConnectException throw new RuntimeException("message type exception"); } - return data != null ? ByteBuffer.wrap(data) : null; + return ByteBuffer.wrap(data); } } diff --git a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java index b0836f9..6e39a7e 100644 --- a/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java +++ b/src/main/java/org/apache/rocketmq/connect/activemq/pattern/PatternProcessor.java @@ -20,11 +20,11 @@ package org.apache.rocketmq.connect.activemq.pattern; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; -import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.connect.activemq.Config; @@ -47,47 +47,43 @@ public class PatternProcessor { this.config = replicator.getConfig(); } - public void start() { - if (!StringUtils.equals("topic", config.getDestinationType()) && !StringUtils.equals("queue", config.getDestinationType())) { + public void start() throws Exception { + if (!StringUtils.equals("topic", config.getDestinationType()) + && !StringUtils.equals("queue", config.getDestinationType())) { + // RuntimeException is caught by DataConnectException throw new RuntimeException("destination type is incorrectness"); } - try { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl()); + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActivemqUrl()); - if (StringUtils.isNotBlank(config.getActivemqUsername()) && StringUtils.isNotBlank(config.getActivemqPassword())) { - connection = connectionFactory.createConnection(config.getActivemqUsername(), config.getActivemqPassword()); - } else { - connection = connectionFactory.createConnection(); - } - connection.start(); - Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode()); - Destination destination = null; - if (StringUtils.equals("topic", config.getDestinationType())) { - destination = session.createTopic(config.getDestinationName()); - } else if (StringUtils.equals("queue", config.getDestinationType())) { - destination = session.createQueue(config.getDestinationName()); - } - consumer = session.createConsumer(destination, config.getMessageSelector()); - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - replicator.commit(message, true); - } - }); - } catch (Exception e) { - throw new RuntimeException(e); + if (StringUtils.isNotBlank(config.getActivemqUsername()) + && StringUtils.isNotBlank(config.getActivemqPassword())) { + connection = connectionFactory.createConnection(config.getActivemqUsername(), config.getActivemqPassword()); + } else { + connection = connectionFactory.createConnection(); + } + connection.start(); + Session session = connection.createSession(config.getSessionTransacted(), config.getSessionAcknowledgeMode()); + Destination destination = null; + if (StringUtils.equals("topic", config.getDestinationType())) { + destination = session.createTopic(config.getDestinationName()); + } else if (StringUtils.equals("queue", config.getDestinationType())) { + destination = session.createQueue(config.getDestinationName()); } + consumer = session.createConsumer(destination, config.getMessageSelector()); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + replicator.commit(message, true); + } + }); + } - public void stop() { - try { - consumer.close(); - session.close(); - connection.close(); - } catch (JMSException e) { - throw new RuntimeException(e); - } + public void stop() throws Exception { + consumer.close(); + session.close(); + connection.close(); } } diff --git a/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java index 28cafa4..b94237e 100644 --- a/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java +++ b/src/test/java/org/apache/rocketmq/connect/activemq/ReplicatorTest.java @@ -50,12 +50,12 @@ public class ReplicatorTest { } @Test(expected = RuntimeException.class) - public void startTest() { + public void startTest() throws Exception { replicator.start(); } @Test - public void stop() { + public void stop() throws Exception { replicator.stop(); Mockito.verify(patternProcessor, Mockito.times(1)).stop(); }
