[ROCKETMQ-186] Implement the OpenMessaging specification 0.1.0-alpha version
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/1d966b50 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/1d966b50 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/1d966b50 Branch: refs/heads/release-4.1.0-incubating Commit: 1d966b50c2ec189ca4f1bf81420959a33159a8ad Parents: 1630f27 Author: yukon <[email protected]> Authored: Wed May 24 16:50:51 2017 +0800 Committer: yukon <[email protected]> Committed: Wed May 24 16:50:51 2017 +0800 ---------------------------------------------------------------------- distribution/release-client.xml | 1 + distribution/release.xml | 1 + example/pom.xml | 9 + .../example/openmessaging/SimpleProducer.java | 76 +++++++ .../openmessaging/SimplePullConsumer.java | 58 +++++ .../openmessaging/SimplePushConsumer.java | 59 +++++ openmessaging/pom.xml | 42 ++++ .../rocketmq/MessagingAccessPointImpl.java | 132 +++++++++++ .../rocketmq/config/ClientConfig.java | 194 ++++++++++++++++ .../rocketmq/consumer/LocalMessageCache.java | 213 +++++++++++++++++ .../rocketmq/consumer/PullConsumerImpl.java | 166 ++++++++++++++ .../rocketmq/consumer/PushConsumerImpl.java | 181 +++++++++++++++ .../rocketmq/domain/BytesMessageImpl.java | 108 +++++++++ .../rocketmq/domain/ConsumeRequest.java | 55 +++++ .../rocketmq/domain/NonStandardKeys.java | 30 +++ .../rocketmq/domain/SendResultImpl.java | 40 ++++ .../rocketmq/producer/AbstractOMSProducer.java | 138 +++++++++++ .../rocketmq/producer/ProducerImpl.java | 124 ++++++++++ .../rocketmq/producer/SequenceProducerImpl.java | 95 ++++++++ .../rocketmq/promise/DefaultPromise.java | 227 +++++++++++++++++++ .../rocketmq/promise/FutureState.java | 51 +++++ .../openmessaging/rocketmq/utils/BeanUtils.java | 185 +++++++++++++++ .../openmessaging/rocketmq/utils/OMSUtil.java | 182 +++++++++++++++ .../consumer/LocalMessageCacheTest.java | 89 ++++++++ .../rocketmq/consumer/PullConsumerImplTest.java | 96 ++++++++ .../rocketmq/consumer/PushConsumerImplTest.java | 87 +++++++ .../rocketmq/producer/ProducerImplTest.java | 101 +++++++++ .../producer/SequenceProducerImplTest.java | 86 +++++++ .../rocketmq/promise/DefaultPromiseTest.java | 136 +++++++++++ .../rocketmq/utils/BeanUtilsTest.java | 110 +++++++++ pom.xml | 6 + 31 files changed, 3078 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/distribution/release-client.xml ---------------------------------------------------------------------- diff --git a/distribution/release-client.xml b/distribution/release-client.xml index 46563eb..84d33a0 100644 --- a/distribution/release-client.xml +++ b/distribution/release-client.xml @@ -47,6 +47,7 @@ <useAllReactorProjects>true</useAllReactorProjects> <includes> <include>org.apache.rocketmq:rocketmq-client</include> + <include>org.apache.rocketmq:rocketmq-openmessaging</include> </includes> <binaries> <outputDirectory>./</outputDirectory> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/distribution/release.xml ---------------------------------------------------------------------- diff --git a/distribution/release.xml b/distribution/release.xml index 9e4ef2a..c67d23e 100644 --- a/distribution/release.xml +++ b/distribution/release.xml @@ -68,6 +68,7 @@ <include>org.apache.rocketmq:rocketmq-filtersrv</include> <include>org.apache.rocketmq:rocketmq-example</include> <include>org.apache.rocketmq:rocketmq-filter</include> + <include>org.apache.rocketmq:rocketmq-openmessaging</include> </includes> <binaries> <outputDirectory>lib/</outputDirectory> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/example/pom.xml ---------------------------------------------------------------------- diff --git a/example/pom.xml b/example/pom.xml index 785a4ca..840fa36 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -48,5 +48,14 @@ <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-openmessaging</artifactId> + <version>4.1.0-incubating-SNAPSHOT</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java new file mode 100644 index 0000000..9d162ac --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.openmessaging; + +import io.openmessaging.Message; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.Producer; +import io.openmessaging.Promise; +import io.openmessaging.PromiseListener; +import io.openmessaging.SendResult; +import java.nio.charset.Charset; + +public class SimpleProducer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final Producer producer = messagingAccessPoint.createProducer(); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + producer.startup(); + System.out.printf("Producer startup OK%n"); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + producer.shutdown(); + messagingAccessPoint.shutdown(); + } + })); + + { + Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + SendResult sendResult = producer.send(message); + //final Void aVoid = result.get(3000L); + System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId()); + } + + { + final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + result.addListener(new PromiseListener<SendResult>() { + @Override + public void operationCompleted(Promise<SendResult> promise) { + System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId()); + } + + @Override + public void operationFailed(Promise<SendResult> promise) { + System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage()); + } + }); + } + + { + producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + System.out.printf("Send oneway message OK%n"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java new file mode 100644 index 0000000..8e06772 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.openmessaging; + +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.OMS; +import io.openmessaging.PullConsumer; +import io.openmessaging.rocketmq.domain.NonStandardKeys; + +public class SimplePullConsumer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC", + OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + consumer.shutdown(); + messagingAccessPoint.shutdown(); + } + })); + + consumer.startup(); + System.out.printf("Consumer startup OK%n"); + + while (true) { + Message message = consumer.poll(); + if (message != null) { + String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); + System.out.printf("Received one message: %s%n", msgId); + consumer.ack(msgId); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java new file mode 100644 index 0000000..b0935d4 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.openmessaging; + +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.MessageListener; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.OMS; +import io.openmessaging.PushConsumer; +import io.openmessaging.ReceivedMessageContext; +import io.openmessaging.rocketmq.domain.NonStandardKeys; + +public class SimplePushConsumer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final PushConsumer consumer = messagingAccessPoint. + createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + consumer.shutdown(); + messagingAccessPoint.shutdown(); + } + })); + + consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() { + @Override + public void onMessage(final Message message, final ReceivedMessageContext context) { + System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID)); + context.ack(); + } + }); + + consumer.startup(); + System.out.printf("Consumer startup OK%n"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/pom.xml ---------------------------------------------------------------------- diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml new file mode 100644 index 0000000..e853642 --- /dev/null +++ b/openmessaging/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>rocketmq-all</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>4.1.0-incubating-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>rocketmq-openmessaging</artifactId> + <name>rocketmq-openmessaging ${project.version}</name> + + <dependencies> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-client</artifactId> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java new file mode 100644 index 0000000..65caf84 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq; + +import io.openmessaging.IterableConsumer; +import io.openmessaging.KeyValue; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.Producer; +import io.openmessaging.PullConsumer; +import io.openmessaging.PushConsumer; +import io.openmessaging.ResourceManager; +import io.openmessaging.SequenceProducer; +import io.openmessaging.ServiceEndPoint; +import io.openmessaging.exception.OMSNotSupportedException; +import io.openmessaging.observer.Observer; +import io.openmessaging.rocketmq.consumer.PullConsumerImpl; +import io.openmessaging.rocketmq.consumer.PushConsumerImpl; +import io.openmessaging.rocketmq.producer.ProducerImpl; +import io.openmessaging.rocketmq.producer.SequenceProducerImpl; +import io.openmessaging.rocketmq.utils.OMSUtil; + +public class MessagingAccessPointImpl implements MessagingAccessPoint { + private final KeyValue accessPointProperties; + + public MessagingAccessPointImpl(final KeyValue accessPointProperties) { + this.accessPointProperties = accessPointProperties; + } + + @Override + public KeyValue properties() { + return accessPointProperties; + } + + @Override + public Producer createProducer() { + return new ProducerImpl(this.accessPointProperties); + } + + @Override + public Producer createProducer(KeyValue properties) { + return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); + } + + @Override + public SequenceProducer createSequenceProducer() { + return new SequenceProducerImpl(this.accessPointProperties); + } + + @Override + public SequenceProducer createSequenceProducer(KeyValue properties) { + return new SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); + } + + @Override + public PushConsumer createPushConsumer() { + return new PushConsumerImpl(accessPointProperties); + } + + @Override + public PushConsumer createPushConsumer(KeyValue properties) { + return new PushConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); + } + + @Override + public PullConsumer createPullConsumer(String queueName) { + return new PullConsumerImpl(queueName, accessPointProperties); + } + + @Override + public PullConsumer createPullConsumer(String queueName, KeyValue properties) { + return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties, properties)); + } + + @Override + public IterableConsumer createIterableConsumer(String queueName) { + throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version"); + } + + @Override + public IterableConsumer createIterableConsumer(String queueName, KeyValue properties) { + throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version"); + } + + @Override + public ResourceManager getResourceManager() { + throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version."); + } + + @Override + public ServiceEndPoint createServiceEndPoint() { + throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version."); + } + + @Override + public ServiceEndPoint createServiceEndPoint(KeyValue properties) { + throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version."); + } + + @Override + public void addObserver(Observer observer) { + //Ignore + } + + @Override + public void deleteObserver(Observer observer) { + //Ignore + } + + @Override + public void startup() { + //Ignore + } + + @Override + public void shutdown() { + //Ignore + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java new file mode 100644 index 0000000..7077c6d --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.config; + +import io.openmessaging.PropertyKeys; +import io.openmessaging.rocketmq.domain.NonStandardKeys; + +public class ClientConfig implements PropertyKeys, NonStandardKeys { + private String omsDriverImpl; + private String omsAccessPoints; + private String omsNamespace; + private String omsProducerId; + private String omsConsumerId; + private int omsOperationTimeout = 5000; + private String omsRoutingName; + private String omsOperatorName; + private String omsDstQueue; + private String omsSrcTopic; + private String rmqConsumerGroup; + private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP"; + private int rmqMaxRedeliveryTimes = 16; + private int rmqMessageConsumeTimeout = 15; //In minutes + private int rmqMaxConsumeThreadNums = 64; + private int rmqMinConsumeThreadNums = 20; + private String rmqMessageDestination; + private int rmqPullMessageBatchNums = 32; + private int rmqPullMessageCacheCapacity = 1000; + + public String getOmsDriverImpl() { + return omsDriverImpl; + } + + public void setOmsDriverImpl(final String omsDriverImpl) { + this.omsDriverImpl = omsDriverImpl; + } + + public String getOmsAccessPoints() { + return omsAccessPoints; + } + + public void setOmsAccessPoints(final String omsAccessPoints) { + this.omsAccessPoints = omsAccessPoints; + } + + public String getOmsNamespace() { + return omsNamespace; + } + + public void setOmsNamespace(final String omsNamespace) { + this.omsNamespace = omsNamespace; + } + + public String getOmsProducerId() { + return omsProducerId; + } + + public void setOmsProducerId(final String omsProducerId) { + this.omsProducerId = omsProducerId; + } + + public String getOmsConsumerId() { + return omsConsumerId; + } + + public void setOmsConsumerId(final String omsConsumerId) { + this.omsConsumerId = omsConsumerId; + } + + public int getOmsOperationTimeout() { + return omsOperationTimeout; + } + + public void setOmsOperationTimeout(final int omsOperationTimeout) { + this.omsOperationTimeout = omsOperationTimeout; + } + + public String getOmsRoutingName() { + return omsRoutingName; + } + + public void setOmsRoutingName(final String omsRoutingName) { + this.omsRoutingName = omsRoutingName; + } + + public String getOmsOperatorName() { + return omsOperatorName; + } + + public void setOmsOperatorName(final String omsOperatorName) { + this.omsOperatorName = omsOperatorName; + } + + public String getOmsDstQueue() { + return omsDstQueue; + } + + public void setOmsDstQueue(final String omsDstQueue) { + this.omsDstQueue = omsDstQueue; + } + + public String getOmsSrcTopic() { + return omsSrcTopic; + } + + public void setOmsSrcTopic(final String omsSrcTopic) { + this.omsSrcTopic = omsSrcTopic; + } + + public String getRmqConsumerGroup() { + return rmqConsumerGroup; + } + + public void setRmqConsumerGroup(final String rmqConsumerGroup) { + this.rmqConsumerGroup = rmqConsumerGroup; + } + + public String getRmqProducerGroup() { + return rmqProducerGroup; + } + + public void setRmqProducerGroup(final String rmqProducerGroup) { + this.rmqProducerGroup = rmqProducerGroup; + } + + public int getRmqMaxRedeliveryTimes() { + return rmqMaxRedeliveryTimes; + } + + public void setRmqMaxRedeliveryTimes(final int rmqMaxRedeliveryTimes) { + this.rmqMaxRedeliveryTimes = rmqMaxRedeliveryTimes; + } + + public int getRmqMessageConsumeTimeout() { + return rmqMessageConsumeTimeout; + } + + public void setRmqMessageConsumeTimeout(final int rmqMessageConsumeTimeout) { + this.rmqMessageConsumeTimeout = rmqMessageConsumeTimeout; + } + + public int getRmqMaxConsumeThreadNums() { + return rmqMaxConsumeThreadNums; + } + + public void setRmqMaxConsumeThreadNums(final int rmqMaxConsumeThreadNums) { + this.rmqMaxConsumeThreadNums = rmqMaxConsumeThreadNums; + } + + public int getRmqMinConsumeThreadNums() { + return rmqMinConsumeThreadNums; + } + + public void setRmqMinConsumeThreadNums(final int rmqMinConsumeThreadNums) { + this.rmqMinConsumeThreadNums = rmqMinConsumeThreadNums; + } + + public String getRmqMessageDestination() { + return rmqMessageDestination; + } + + public void setRmqMessageDestination(final String rmqMessageDestination) { + this.rmqMessageDestination = rmqMessageDestination; + } + + public int getRmqPullMessageBatchNums() { + return rmqPullMessageBatchNums; + } + + public void setRmqPullMessageBatchNums(final int rmqPullMessageBatchNums) { + this.rmqPullMessageBatchNums = rmqPullMessageBatchNums; + } + + public int getRmqPullMessageCacheCapacity() { + return rmqPullMessageCacheCapacity; + } + + public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) { + this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java new file mode 100644 index 0000000..90f9e03 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.KeyValue; +import io.openmessaging.PropertyKeys; +import io.openmessaging.ServiceLifecycle; +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.ConsumeRequest; +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.ThreadUtils; +import org.slf4j.Logger; + +class LocalMessageCache implements ServiceLifecycle { + private final BlockingQueue<ConsumeRequest> consumeRequestCache; + private final Map<String, ConsumeRequest> consumedRequest; + private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable; + private final DefaultMQPullConsumer rocketmqPullConsumer; + private final ClientConfig clientConfig; + private final ScheduledExecutorService cleanExpireMsgExecutors; + + private final static Logger log = ClientLogger.getLog(); + + LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) { + consumeRequestCache = new LinkedBlockingQueue<>(clientConfig.getRmqPullMessageCacheCapacity()); + this.consumedRequest = new ConcurrentHashMap<>(); + this.pullOffsetTable = new ConcurrentHashMap<>(); + this.rocketmqPullConsumer = rocketmqPullConsumer; + this.clientConfig = clientConfig; + this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "OMS_CleanExpireMsgScheduledThread_")); + } + + int nextPullBatchNums() { + return Math.min(clientConfig.getRmqPullMessageBatchNums(), consumeRequestCache.remainingCapacity()); + } + + long nextPullOffset(MessageQueue remoteQueue) { + if (!pullOffsetTable.containsKey(remoteQueue)) { + try { + pullOffsetTable.putIfAbsent(remoteQueue, + rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false)); + } catch (MQClientException e) { + log.error("A error occurred in fetch consume offset process.", e); + } + } + return pullOffsetTable.get(remoteQueue); + } + + void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) { + pullOffsetTable.put(remoteQueue, nextPullOffset); + } + + void submitConsumeRequest(ConsumeRequest consumeRequest) { + try { + consumeRequestCache.put(consumeRequest); + } catch (InterruptedException ignore) { + } + } + + MessageExt poll() { + return poll(clientConfig.getOmsOperationTimeout()); + } + + MessageExt poll(final KeyValue properties) { + int currentPollTimeout = clientConfig.getOmsOperationTimeout(); + if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) { + currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); + } + return poll(currentPollTimeout); + } + + private MessageExt poll(long timeout) { + try { + ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS); + if (consumeRequest != null) { + MessageExt messageExt = consumeRequest.getMessageExt(); + consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); + MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis())); + consumedRequest.put(messageExt.getMsgId(), consumeRequest); + return messageExt; + } + } catch (InterruptedException ignore) { + } + return null; + } + + void ack(final String messageId) { + ConsumeRequest consumeRequest = consumedRequest.remove(messageId); + if (consumeRequest != null) { + long offset = consumeRequest.getProcessQueue().removeMessage(Collections.singletonList(consumeRequest.getMessageExt())); + try { + rocketmqPullConsumer.updateConsumeOffset(consumeRequest.getMessageQueue(), offset); + } catch (MQClientException e) { + log.error("A error occurred in update consume offset process.", e); + } + } + } + + void ack(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) { + consumedRequest.remove(messageExt.getMsgId()); + long offset = processQueue.removeMessage(Collections.singletonList(messageExt)); + try { + rocketmqPullConsumer.updateConsumeOffset(messageQueue, offset); + } catch (MQClientException e) { + log.error("A error occurred in update consume offset process.", e); + } + } + + @Override + public void startup() { + this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + cleanExpireMsg(); + } + }, clientConfig.getRmqMessageConsumeTimeout(), clientConfig.getRmqMessageConsumeTimeout(), TimeUnit.MINUTES); + } + + @Override + public void shutdown() { + ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, TimeUnit.MILLISECONDS); + } + + private void cleanExpireMsg() { + for (final Map.Entry<MessageQueue, ProcessQueue> next : rocketmqPullConsumer.getDefaultMQPullConsumerImpl() + .getRebalanceImpl().getProcessQueueTable().entrySet()) { + ProcessQueue pq = next.getValue(); + MessageQueue mq = next.getKey(); + ReadWriteLock lockTreeMap = getLockInProcessQueue(pq); + if (lockTreeMap == null) { + log.error("Gets tree map lock in process queue error, may be has compatibility issue"); + return; + } + + TreeMap<Long, MessageExt> msgTreeMap = pq.getMsgTreeMap(); + + int loop = msgTreeMap.size(); + for (int i = 0; i < loop; i++) { + MessageExt msg = null; + try { + lockTreeMap.readLock().lockInterruptibly(); + try { + if (!msgTreeMap.isEmpty()) { + msg = msgTreeMap.firstEntry().getValue(); + if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg)) + > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) { + //Expired, ack and remove it. + } else { + break; + } + } else { + break; + } + } finally { + lockTreeMap.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("Gets expired message exception", e); + } + + try { + rocketmqPullConsumer.sendMessageBack(msg, 3); + log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", + msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); + ack(mq, pq, msg); + } catch (Exception e) { + log.error("Send back expired msg exception", e); + } + } + } + } + + private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) { + try { + return (ReadWriteLock) FieldUtils.readDeclaredField(pq, "lockTreeMap", true); + } catch (IllegalAccessException e) { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java new file mode 100644 index 0000000..8d396d4 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.PropertyKeys; +import io.openmessaging.PullConsumer; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.ConsumeRequest; +import io.openmessaging.rocketmq.utils.BeanUtils; +import io.openmessaging.rocketmq.utils.OMSUtil; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MQPullConsumer; +import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullTaskCallback; +import org.apache.rocketmq.client.consumer.PullTaskContext; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; + +public class PullConsumerImpl implements PullConsumer { + private final DefaultMQPullConsumer rocketmqPullConsumer; + private final KeyValue properties; + private boolean started = false; + private String targetQueueName; + private final MQPullConsumerScheduleService pullConsumerScheduleService; + private final LocalMessageCache localMessageCache; + private final ClientConfig clientConfig; + + final static Logger log = ClientLogger.getLog(); + + public PullConsumerImpl(final String queueName, final KeyValue properties) { + this.properties = properties; + this.targetQueueName = queueName; + + this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); + + String consumerGroup = clientConfig.getRmqConsumerGroup(); + if (null == consumerGroup || consumerGroup.isEmpty()) { + throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); + } + pullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroup); + + this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); + + String accessPoints = clientConfig.getOmsAccessPoints(); + if (accessPoints == null || accessPoints.isEmpty()) { + throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); + } + this.rocketmqPullConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); + + this.rocketmqPullConsumer.setConsumerGroup(consumerGroup); + + int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes(); + this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes); + + String consumerId = OMSUtil.buildInstanceName(); + this.rocketmqPullConsumer.setInstanceName(consumerId); + properties.put(PropertyKeys.CONSUMER_ID, consumerId); + + this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig); + } + + @Override + public KeyValue properties() { + return properties; + } + + @Override + public Message poll() { + MessageExt rmqMsg = localMessageCache.poll(); + return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); + } + + @Override + public Message poll(final KeyValue properties) { + MessageExt rmqMsg = localMessageCache.poll(properties); + return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); + } + + @Override + public void ack(final String messageId) { + localMessageCache.ack(messageId); + } + + @Override + public void ack(final String messageId, final KeyValue properties) { + localMessageCache.ack(messageId); + } + + @Override + public synchronized void startup() { + if (!started) { + try { + registerPullTaskCallback(); + this.pullConsumerScheduleService.start(); + this.localMessageCache.startup(); + } catch (MQClientException e) { + throw new OMSRuntimeException("-1", e); + } + } + this.started = true; + } + + private void registerPullTaskCallback() { + this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() { + @Override + public void doPullTask(final MessageQueue mq, final PullTaskContext context) { + MQPullConsumer consumer = context.getPullConsumer(); + try { + long offset = localMessageCache.nextPullOffset(mq); + + PullResult pullResult = consumer.pull(mq, "*", + offset, localMessageCache.nextPullBatchNums()); + ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl() + .getProcessQueueTable().get(mq); + switch (pullResult.getPullStatus()) { + case FOUND: + if (pq != null) { + pq.putMessage(pullResult.getMsgFoundList()); + for (final MessageExt messageExt : pullResult.getMsgFoundList()) { + localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq)); + } + } + break; + default: + break; + } + localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset()); + } catch (Exception e) { + log.error("A error occurred in pull message process.", e); + } + } + }); + } + + @Override + public synchronized void shutdown() { + if (this.started) { + this.localMessageCache.shutdown(); + this.pullConsumerScheduleService.shutdown(); + this.rocketmqPullConsumer.shutdown(); + } + this.started = false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java new file mode 100644 index 0000000..f9b8058 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.MessageListener; +import io.openmessaging.OMS; +import io.openmessaging.PropertyKeys; +import io.openmessaging.PushConsumer; +import io.openmessaging.ReceivedMessageContext; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import io.openmessaging.rocketmq.utils.BeanUtils; +import io.openmessaging.rocketmq.utils.OMSUtil; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; + +public class PushConsumerImpl implements PushConsumer { + private final DefaultMQPushConsumer rocketmqPushConsumer; + private final KeyValue properties; + private boolean started = false; + private final Map<String, MessageListener> subscribeTable = new ConcurrentHashMap<>(); + private final ClientConfig clientConfig; + + public PushConsumerImpl(final KeyValue properties) { + this.rocketmqPushConsumer = new DefaultMQPushConsumer(); + this.properties = properties; + this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); + + String accessPoints = clientConfig.getOmsAccessPoints(); + if (accessPoints == null || accessPoints.isEmpty()) { + throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); + } + this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); + + String consumerGroup = clientConfig.getRmqConsumerGroup(); + if (null == consumerGroup || consumerGroup.isEmpty()) { + throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); + } + this.rocketmqPushConsumer.setConsumerGroup(consumerGroup); + this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes()); + this.rocketmqPushConsumer.setConsumeTimeout(clientConfig.getRmqMessageConsumeTimeout()); + this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums()); + this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums()); + + String consumerId = OMSUtil.buildInstanceName(); + this.rocketmqPushConsumer.setInstanceName(consumerId); + properties.put(PropertyKeys.CONSUMER_ID, consumerId); + + this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl()); + } + + @Override + public KeyValue properties() { + return properties; + } + + @Override + public void resume() { + this.rocketmqPushConsumer.resume(); + } + + @Override + public void suspend() { + this.rocketmqPushConsumer.suspend(); + } + + @Override + public boolean isSuspended() { + return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause(); + } + + @Override + public PushConsumer attachQueue(final String queueName, final MessageListener listener) { + this.subscribeTable.put(queueName, listener); + try { + this.rocketmqPushConsumer.subscribe(queueName, "*"); + } catch (MQClientException e) { + throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer can't attach to %s.", queueName)); + } + return this; + } + + @Override + public synchronized void startup() { + if (!started) { + try { + this.rocketmqPushConsumer.start(); + } catch (MQClientException e) { + throw new OMSRuntimeException("-1", e); + } + } + this.started = true; + } + + @Override + public synchronized void shutdown() { + if (this.started) { + this.rocketmqPushConsumer.shutdown(); + } + this.started = false; + } + + class MessageListenerImpl implements MessageListenerConcurrently { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> rmqMsgList, + ConsumeConcurrentlyContext contextRMQ) { + MessageExt rmqMsg = rmqMsgList.get(0); + BytesMessage omsMsg = OMSUtil.msgConvert(rmqMsg); + + MessageListener listener = PushConsumerImpl.this.subscribeTable.get(rmqMsg.getTopic()); + + if (listener == null) { + throw new OMSRuntimeException("-1", + String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic())); + } + + final KeyValue contextProperties = OMS.newKeyValue(); + final CountDownLatch sync = new CountDownLatch(1); + + contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name()); + + ReceivedMessageContext context = new ReceivedMessageContext() { + @Override + public KeyValue properties() { + return contextProperties; + } + + @Override + public void ack() { + sync.countDown(); + contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, + ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); + } + + @Override + public void ack(final KeyValue properties) { + sync.countDown(); + contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, + properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS)); + } + }; + long begin = System.currentTimeMillis(); + listener.onMessage(omsMsg, context); + long costs = System.currentTimeMillis() - begin; + long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000; + try { + sync.await(Math.max(0, timeoutMills - costs), TimeUnit.MILLISECONDS); + } catch (InterruptedException ignore) { + } + + return ConsumeConcurrentlyStatus.valueOf(contextProperties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java new file mode 100644 index 0000000..43f80ae --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.OMS; +import org.apache.commons.lang3.builder.ToStringBuilder; + +public class BytesMessageImpl implements BytesMessage { + private KeyValue headers; + private KeyValue properties; + private byte[] body; + + public BytesMessageImpl() { + this.headers = OMS.newKeyValue(); + this.properties = OMS.newKeyValue(); + } + + @Override + public byte[] getBody() { + return body; + } + + @Override + public BytesMessage setBody(final byte[] body) { + this.body = body; + return this; + } + + @Override + public KeyValue headers() { + return headers; + } + + @Override + public KeyValue properties() { + return properties; + } + + @Override + public Message putHeaders(final String key, final int value) { + headers.put(key, value); + return this; + } + + @Override + public Message putHeaders(final String key, final long value) { + headers.put(key, value); + return this; + } + + @Override + public Message putHeaders(final String key, final double value) { + headers.put(key, value); + return this; + } + + @Override + public Message putHeaders(final String key, final String value) { + headers.put(key, value); + return this; + } + + @Override + public Message putProperties(final String key, final int value) { + properties.put(key, value); + return this; + } + + @Override + public Message putProperties(final String key, final long value) { + properties.put(key, value); + return this; + } + + @Override + public Message putProperties(final String key, final double value) { + properties.put(key, value); + return this; + } + + @Override + public Message putProperties(final String key, final String value) { + properties.put(key, value); + return this; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java new file mode 100644 index 0000000..7ce4a9b --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +public class ConsumeRequest { + private final MessageExt messageExt; + private final MessageQueue messageQueue; + private final ProcessQueue processQueue; + private long startConsumeTimeMillis; + + public ConsumeRequest(final MessageExt messageExt, final MessageQueue messageQueue, + final ProcessQueue processQueue) { + this.messageExt = messageExt; + this.messageQueue = messageQueue; + this.processQueue = processQueue; + } + + public MessageExt getMessageExt() { + return messageExt; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + public ProcessQueue getProcessQueue() { + return processQueue; + } + + public long getStartConsumeTimeMillis() { + return startConsumeTimeMillis; + } + + public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) { + this.startConsumeTimeMillis = startConsumeTimeMillis; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java new file mode 100644 index 0000000..3639a3f --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +public interface NonStandardKeys { + String CONSUMER_GROUP = "rmq.consumer.group"; + String PRODUCER_GROUP = "rmq.producer.group"; + String MAX_REDELIVERY_TIMES = "rmq.max.redelivery.times"; + String MESSAGE_CONSUME_TIMEOUT = "rmq.message.consume.timeout"; + String MAX_CONSUME_THREAD_NUMS = "rmq.max.consume.thread.nums"; + String MIN_CONSUME_THREAD_NUMS = "rmq.min.consume.thread.nums"; + String MESSAGE_CONSUME_STATUS = "rmq.message.consume.status"; + String MESSAGE_DESTINATION = "rmq.message.destination"; + String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums"; + String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity"; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java new file mode 100644 index 0000000..228a9f0 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import io.openmessaging.KeyValue; +import io.openmessaging.SendResult; + +public class SendResultImpl implements SendResult { + private String messageId; + private KeyValue properties; + + public SendResultImpl(final String messageId, final KeyValue properties) { + this.messageId = messageId; + this.properties = properties; + } + + @Override + public String messageId() { + return messageId; + } + + @Override + public KeyValue properties() { + return properties; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java new file mode 100644 index 0000000..8246bcd --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.MessageFactory; +import io.openmessaging.MessageHeader; +import io.openmessaging.PropertyKeys; +import io.openmessaging.ServiceLifecycle; +import io.openmessaging.exception.OMSMessageFormatException; +import io.openmessaging.exception.OMSNotSupportedException; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.exception.OMSTimeOutException; +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.BytesMessageImpl; +import io.openmessaging.rocketmq.utils.BeanUtils; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.slf4j.Logger; + +import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName; + +abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { + final static Logger log = ClientLogger.getLog(); + final KeyValue properties; + final DefaultMQProducer rocketmqProducer; + private boolean started = false; + final ClientConfig clientConfig; + + AbstractOMSProducer(final KeyValue properties) { + this.properties = properties; + this.rocketmqProducer = new DefaultMQProducer(); + this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); + + String accessPoints = clientConfig.getOmsAccessPoints(); + if (accessPoints == null || accessPoints.isEmpty()) { + throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); + } + this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';')); + this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup()); + + String producerId = buildInstanceName(); + this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout()); + this.rocketmqProducer.setInstanceName(producerId); + this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); + properties.put(PropertyKeys.PRODUCER_ID, producerId); + } + + @Override + public synchronized void startup() { + if (!started) { + try { + this.rocketmqProducer.start(); + } catch (MQClientException e) { + throw new OMSRuntimeException("-1", e); + } + } + this.started = true; + } + + @Override + public synchronized void shutdown() { + if (this.started) { + this.rocketmqProducer.shutdown(); + } + this.started = false; + } + + OMSRuntimeException checkProducerException(String topic, String msgId, Throwable e) { + if (e instanceof MQClientException) { + if (e.getCause() != null) { + if (e.getCause() instanceof RemotingTimeoutException) { + return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s", + this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e); + } else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) { + MQBrokerException brokerException = (MQBrokerException) e.getCause(); + return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s", + topic, msgId, brokerException.getErrorMessage()), e); + } + } + // Exception thrown by local. + else { + MQClientException clientException = (MQClientException) e; + if (-1 == clientException.getResponseCode()) { + return new OMSRuntimeException("-1", String.format("Topic does not exist, Topic=%s, msgId=%s", + topic, msgId), e); + } else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode()) { + return new OMSMessageFormatException("-1", String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s", + topic, msgId), e); + } + } + } + return new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.", e); + } + + protected void checkMessageType(Message message) { + if (!(message instanceof BytesMessage)) { + throw new OMSNotSupportedException("-1", "Only BytesMessage is supported."); + } + } + + @Override + public BytesMessage createBytesMessageToTopic(final String topic, final byte[] body) { + BytesMessage bytesMessage = new BytesMessageImpl(); + bytesMessage.setBody(body); + bytesMessage.headers().put(MessageHeader.TOPIC, topic); + return bytesMessage; + } + + @Override + public BytesMessage createBytesMessageToQueue(final String queue, final byte[] body) { + BytesMessage bytesMessage = new BytesMessageImpl(); + bytesMessage.setBody(body); + bytesMessage.headers().put(MessageHeader.QUEUE, queue); + return bytesMessage; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java new file mode 100644 index 0000000..2c00c60 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.Producer; +import io.openmessaging.Promise; +import io.openmessaging.PropertyKeys; +import io.openmessaging.SendResult; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.rocketmq.promise.DefaultPromise; +import io.openmessaging.rocketmq.utils.OMSUtil; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendStatus; + +import static io.openmessaging.rocketmq.utils.OMSUtil.msgConvert; + +public class ProducerImpl extends AbstractOMSProducer implements Producer { + + public ProducerImpl(final KeyValue properties) { + super(properties); + } + + @Override + public KeyValue properties() { + return properties; + } + + @Override + public SendResult send(final Message message) { + return send(message, this.rocketmqProducer.getSendMsgTimeout()); + } + + @Override + public SendResult send(final Message message, final KeyValue properties) { + long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT) + ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); + return send(message, timeout); + } + + private SendResult send(final Message message, long timeout) { + checkMessageType(message); + org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); + try { + org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout); + if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) { + log.error(String.format("Send message to RocketMQ failed, %s", message)); + throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed."); + } + message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); + return OMSUtil.sendResultConvert(rmqResult); + } catch (Exception e) { + log.error(String.format("Send message to RocketMQ failed, %s", message), e); + throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e); + } + } + + @Override + public Promise<SendResult> sendAsync(final Message message) { + return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout()); + } + + @Override + public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) { + long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT) + ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); + return sendAsync(message, timeout); + } + + private Promise<SendResult> sendAsync(final Message message, long timeout) { + checkMessageType(message); + org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); + final Promise<SendResult> promise = new DefaultPromise<>(); + try { + this.rocketmqProducer.send(rmqMessage, new SendCallback() { + @Override + public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) { + message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); + promise.set(OMSUtil.sendResultConvert(rmqResult)); + } + + @Override + public void onException(final Throwable e) { + promise.setFailure(e); + } + }, timeout); + } catch (Exception e) { + promise.setFailure(e); + } + return promise; + } + + @Override + public void sendOneway(final Message message) { + checkMessageType(message); + org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); + try { + this.rocketmqProducer.sendOneway(rmqMessage); + } catch (Exception ignore) { //Ignore the oneway exception. + } + } + + @Override + public void sendOneway(final Message message, final KeyValue properties) { + sendOneway(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java new file mode 100644 index 0000000..05225cc --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.SequenceProducer; +import io.openmessaging.rocketmq.utils.OMSUtil; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.rocketmq.client.Validators; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.SendResult; + +public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer { + + private BlockingQueue<Message> msgCacheQueue; + + public SequenceProducerImpl(final KeyValue properties) { + super(properties); + this.msgCacheQueue = new LinkedBlockingQueue<>(); + } + + @Override + public KeyValue properties() { + return properties; + } + + @Override + public void send(final Message message) { + checkMessageType(message); + org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message); + try { + Validators.checkMessage(rmqMessage, this.rocketmqProducer); + } catch (MQClientException e) { + throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e); + } + msgCacheQueue.add(message); + } + + @Override + public void send(final Message message, final KeyValue properties) { + send(message); + } + + @Override + public synchronized void commit() { + List<Message> messages = new ArrayList<>(); + msgCacheQueue.drainTo(messages); + + List<org.apache.rocketmq.common.message.Message> rmqMessages = new ArrayList<>(); + + for (Message message : messages) { + rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message)); + } + + if (rmqMessages.size() == 0) { + return; + } + + try { + SendResult sendResult = this.rocketmqProducer.send(rmqMessages); + String[] msgIdArray = sendResult.getMsgId().split(","); + for (int i = 0; i < messages.size(); i++) { + Message message = messages.get(i); + message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]); + } + } catch (Exception e) { + throw checkProducerException("", "", e); + } + } + + @Override + public synchronized void rollback() { + msgCacheQueue.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java new file mode 100644 index 0000000..c863ccf --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.promise; + +import io.openmessaging.Promise; +import io.openmessaging.PromiseListener; +import io.openmessaging.exception.OMSRuntimeException; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultPromise<V> implements Promise<V> { + private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class); + private final Object lock = new Object(); + private volatile FutureState state = FutureState.DOING; + private V result = null; + private long timeout; + private long createTime; + private Throwable exception = null; + private List<PromiseListener<V>> promiseListenerList; + + public DefaultPromise() { + createTime = System.currentTimeMillis(); + promiseListenerList = new ArrayList<>(); + timeout = 5000; + } + + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return state.isCancelledState(); + } + + @Override + public boolean isDone() { + return state.isDoneState(); + } + + @Override + public V get() { + return result; + } + + @Override + public V get(final long timeout) { + synchronized (lock) { + if (!isDoing()) { + return getValueOrThrowable(); + } + + if (timeout <= 0) { + try { + lock.wait(); + } catch (Exception e) { + cancel(e); + } + return getValueOrThrowable(); + } else { + long waitTime = timeout - (System.currentTimeMillis() - createTime); + if (waitTime > 0) { + for (;; ) { + try { + lock.wait(waitTime); + } catch (InterruptedException e) { + LOG.error("promise get value interrupted,excepiton:{}", e.getMessage()); + } + + if (!isDoing()) { + break; + } else { + waitTime = timeout - (System.currentTimeMillis() - createTime); + if (waitTime <= 0) { + break; + } + } + } + } + + if (isDoing()) { + timeoutSoCancel(); + } + } + return getValueOrThrowable(); + } + } + + @Override + public boolean set(final V value) { + if (value == null) + return false; + this.result = value; + return done(); + } + + @Override + public boolean setFailure(final Throwable cause) { + if (cause == null) + return false; + this.exception = cause; + return done(); + } + + @Override + public void addListener(final PromiseListener<V> listener) { + if (listener == null) { + throw new NullPointerException("FutureListener is null"); + } + + boolean notifyNow = false; + synchronized (lock) { + if (!isDoing()) { + notifyNow = true; + } else { + if (promiseListenerList == null) { + promiseListenerList = new ArrayList<>(); + } + promiseListenerList.add(listener); + } + } + + if (notifyNow) { + notifyListener(listener); + } + } + + @Override + public Throwable getThrowable() { + return exception; + } + + private void notifyListeners() { + if (promiseListenerList != null) { + for (PromiseListener<V> listener : promiseListenerList) { + notifyListener(listener); + } + } + } + + private boolean isSuccess() { + return isDone() && (exception == null); + } + + private void timeoutSoCancel() { + synchronized (lock) { + if (!isDoing()) { + return; + } + state = FutureState.CANCELLED; + exception = new RuntimeException("Get request result is timeout or interrupted"); + lock.notifyAll(); + } + notifyListeners(); + } + + private V getValueOrThrowable() { + if (exception != null) { + Throwable e = exception.getCause() != null ? exception.getCause() : exception; + throw new OMSRuntimeException("-1", e); + } + notifyListeners(); + return result; + } + + private boolean isDoing() { + return state.isDoingState(); + } + + private boolean done() { + synchronized (lock) { + if (!isDoing()) { + return false; + } + + state = FutureState.DONE; + lock.notifyAll(); + } + + notifyListeners(); + return true; + } + + private void notifyListener(final PromiseListener<V> listener) { + try { + if (exception != null) + listener.operationFailed(this); + else + listener.operationCompleted(this); + } catch (Throwable t) { + LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t); + } + } + + private boolean cancel(Exception e) { + synchronized (lock) { + if (!isDoing()) { + return false; + } + + state = FutureState.CANCELLED; + exception = e; + lock.notifyAll(); + } + + notifyListeners(); + return true; + } +} +
