This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch OpenMessaging in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git
commit c2b46201ef46bfb4dfefc1933f1c492c3eb8c734 Author: 翊名 <[email protected]> AuthorDate: Tue Nov 19 17:13:59 2019 +0800 feat(client) adapt to OpenMessaging api --- ons-core/ons-auth4client/pom.xml | 2 +- ons-core/ons-client/pom.xml | 4 +- .../rocketmq/MessagingAccessPointImpl.java | 46 +++-- .../org/apache/rocketmq/ons/api/Constants.java | 14 +- .../org/apache/rocketmq/ons/api/ONSFactory.java | 225 +++++++++++++++++++++ .../org/apache/rocketmq/ons/api/ONSFactoryAPI.java | 52 +++++ .../api/{impl/constant => }/PropertyKeyConst.java | 45 ++--- .../rocketmq/ons/api/PropertyValueConst.java | 17 +- .../ons/api/exception/ONSClientException.java | 25 ++- .../apache/rocketmq/ons/api/impl/MQClientInfo.java | 5 +- .../rocketmq/ons/api/impl/ONSFactoryImpl.java} | 54 ++--- .../ons/api/impl/rocketmq/BatchConsumerImpl.java | 26 +-- .../ons/api/impl/rocketmq/ConsumerImpl.java | 32 +-- .../ons/api/impl/rocketmq/ONSClientAbstract.java | 29 +-- .../ons/api/impl/rocketmq/ONSConsumerAbstract.java | 21 +- .../rocketmq/ons/api/impl/rocketmq/ONSUtil.java | 9 +- .../ons/api/impl/rocketmq/OrderConsumerImpl.java | 27 +-- .../ons/api/impl/rocketmq/OrderProducerImpl.java | 19 +- .../ons/api/impl/rocketmq/ProducerImpl.java | 58 +++--- .../api/impl/rocketmq/TransactionProducerImpl.java | 19 +- .../services/io.openmessaging.api.ONSFactoryAPI | 1 + .../impl/rocketmq/NameServerAutoUpdateTest.java | 17 +- .../impl/rocketmq/ONSClientTokenUpdateTest.java | 23 +-- ons-core/ons-trace-core/pom.xml | 2 +- ons-core/pom.xml | 2 +- ons-sample/pom.xml | 4 +- .../org/apache/rocketmq/ons/sample/MQConfig.java | 4 +- .../ons/sample/consumer/MessageListenerImpl.java | 8 +- .../ons/sample/consumer/SimpleMQConsumer.java | 9 +- .../ons/sample/consumer/SimpleOrderConsumer.java | 17 +- .../producer/LocalTransactionCheckerImpl.java | 6 +- .../ons/sample/producer/MQTimerProducer.java | 18 +- .../ons/sample/producer/SimpleMQProducer.java | 22 +- .../ons/sample/producer/SimpleOrderProducer.java | 17 +- .../sample/producer/SimpleTransactionProducer.java | 24 ++- pom.xml | 2 +- 36 files changed, 586 insertions(+), 319 deletions(-) diff --git a/ons-core/ons-auth4client/pom.xml b/ons-core/ons-auth4client/pom.xml index fe24766..cfd6185 100644 --- a/ons-core/ons-auth4client/pom.xml +++ b/ons-core/ons-auth4client/pom.xml @@ -19,7 +19,7 @@ <parent> <artifactId>ons-all</artifactId> <groupId>org.apache.rocketmq</groupId> - <version>1.0.1-SNAPSHOT</version> + <version>1.2.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>ons-auth4client</artifactId> diff --git a/ons-core/ons-client/pom.xml b/ons-core/ons-client/pom.xml index 057bce5..b9558d8 100644 --- a/ons-core/ons-client/pom.xml +++ b/ons-core/ons-client/pom.xml @@ -20,7 +20,7 @@ <parent> <groupId>org.apache.rocketmq</groupId> <artifactId>ons-all</artifactId> - <version>1.0.1-SNAPSHOT</version> + <version>1.2.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <packaging>jar</packaging> @@ -39,7 +39,7 @@ <dependency> <groupId>io.openmessaging</groupId> <artifactId>openmessaging-api</artifactId> - <version>1.1.0-SNAPSHOT</version> + <version>1.2.0-SNAPSHOT</version> </dependency> <dependency> <groupId>${project.groupId}</groupId> diff --git a/ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java index 3a0a96b..bb90f64 100644 --- a/ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java +++ b/ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -17,23 +17,26 @@ package io.openmessaging.rocketmq; -import io.openmessaging.Constants; -import io.openmessaging.Consumer; -import io.openmessaging.Message; -import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.OMSResponseStatus; -import io.openmessaging.Producer; -import io.openmessaging.PullConsumer; -import io.openmessaging.batch.BatchConsumer; -import io.openmessaging.order.OrderConsumer; -import io.openmessaging.order.OrderProducer; -import io.openmessaging.transaction.LocalTransactionChecker; -import io.openmessaging.transaction.TransactionProducer; -import io.openmessaging.transaction.TransactionStatus; +import io.openmessaging.api.Consumer; +import io.openmessaging.api.Message; +import io.openmessaging.api.MessagingAccessPoint; +import io.openmessaging.api.OMSBuiltinKeys; +import io.openmessaging.api.OMSResponseStatus; +import io.openmessaging.api.Producer; +import io.openmessaging.api.PullConsumer; +import io.openmessaging.api.batch.BatchConsumer; +import io.openmessaging.api.order.OrderConsumer; +import io.openmessaging.api.order.OrderProducer; +import io.openmessaging.api.transaction.LocalTransactionChecker; +import io.openmessaging.api.transaction.TransactionProducer; +import io.openmessaging.api.transaction.TransactionStatus; import java.util.Properties; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.ons.api.Constants; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.impl.MQClientInfo; import org.apache.rocketmq.ons.api.impl.rocketmq.BatchConsumerImpl; import org.apache.rocketmq.ons.api.impl.rocketmq.ConsumerImpl; import org.apache.rocketmq.ons.api.impl.rocketmq.ONSUtil; @@ -52,45 +55,60 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { @Override public String version() { - return "1.1.3"; + return MQClientInfo.currentVersion; } @Override public Properties attributes() { return null; } + private void injectNameServerAddress(Properties properties) { + if (properties.getProperty(PropertyKeyConst.NAMESRV_ADDR) == null) { + String nameServerAddress = this.attributes.getProperty(OMSBuiltinKeys.ACCESS_POINTS); + properties.put(PropertyKeyConst.NAMESRV_ADDR, nameServerAddress); + } + } + @Override public PullConsumer createPullConsumer(Properties properties) { + injectNameServerAddress(properties); + properties.put(PropertyKeyConst.NAMESRV_ADDR, this.attributes.getProperty(OMSBuiltinKeys.ACCESS_POINTS)); throw OMSResponseStatus.generateException(OMSResponseStatus.STATUS_1101); } @Override public Producer createProducer(final Properties properties) { + injectNameServerAddress(properties); return new ProducerImpl(ONSUtil.extractProperties(properties)); } @Override public Consumer createConsumer(final Properties properties) { + injectNameServerAddress(properties); return new ConsumerImpl(ONSUtil.extractProperties(properties)); } @Override public BatchConsumer createBatchConsumer(final Properties properties) { + injectNameServerAddress(properties); return new BatchConsumerImpl(ONSUtil.extractProperties(properties)); } @Override public OrderProducer createOrderProducer(final Properties properties) { + injectNameServerAddress(properties); return new OrderProducerImpl(ONSUtil.extractProperties(properties)); } @Override public OrderConsumer createOrderedConsumer(final Properties properties) { + injectNameServerAddress(properties); return new OrderConsumerImpl(ONSUtil.extractProperties(properties)); } @Override public TransactionProducer createTransactionProducer(Properties properties, final LocalTransactionChecker checker) { + injectNameServerAddress(properties); return new TransactionProducerImpl(ONSUtil.extractProperties(properties), new TransactionCheckListener() { @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) { diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java similarity index 61% copy from ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java copy to ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java index 1d76cf6..48d8f67 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java @@ -14,17 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.ons.sample.producer; -import io.openmessaging.Message; -import io.openmessaging.transaction.LocalTransactionChecker; -import io.openmessaging.transaction.TransactionStatus; +package org.apache.rocketmq.ons.api; -public class LocalTransactionCheckerImpl implements LocalTransactionChecker { - - @Override - public TransactionStatus check(Message msg) { - System.out.printf("Receive transaction check back request, MsgId: %s%n", msg.getMsgID()); - return TransactionStatus.CommitTransaction; - } +public class Constants { + public static final String TRANSACTION_ID = "__transactionId__"; } diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactory.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactory.java new file mode 100644 index 0000000..847a6ae --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactory.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +import io.openmessaging.api.Consumer; +import io.openmessaging.api.OMS; +import io.openmessaging.api.Producer; +import io.openmessaging.api.batch.BatchConsumer; +import io.openmessaging.api.order.OrderConsumer; +import io.openmessaging.api.order.OrderProducer; +import io.openmessaging.api.transaction.LocalTransactionChecker; +import io.openmessaging.api.transaction.TransactionProducer; +import java.util.Properties; + +/** + * {@link OMS} is recommended. + */ +@Deprecated +public class ONSFactory { + + private static ONSFactoryAPI onsFactory = null; + + static { + + try { + Class<?> factoryClass = + ONSFactory.class.getClassLoader().loadClass( + "org.apache.rocketmq.ons.api.impl.ONSFactoryImpl"); + onsFactory = (ONSFactoryAPI) factoryClass.newInstance(); + } catch (Throwable e) { + e.printStackTrace(); + } + + } + + /** + * Create Producer + * + * <p> + * <code>properties</code> + * Require: + * <ol> + * <li>{@link PropertyKeyConst#GROUP_ID}</li> + * <li>{@link PropertyKeyConst#AccessKey}</li> + * <li>{@link PropertyKeyConst#SecretKey}</li> + * <li>{@link PropertyKeyConst#ONSAddr}</li> + * </ol> + * Optional: + * <ol> + * <li>{@link PropertyKeyConst#OnsChannel}</li> + * <li>{@link PropertyKeyConst#SendMsgTimeoutMillis}</li> + * <li>{@link PropertyKeyConst#NAMESRV_ADDR} will override {@link PropertyKeyConst#ONSAddr}</li> + * </ol> + * </p> + * + * + * <p> + * sample: + * <pre> + * Properties props = ...; + * Producer producer = ONSFactory.createProducer(props); + * producer.start(); + * + * + * Message msg = ...; + * SendResult result = producer.send(msg); + * + * producer.shutdown(); + * </pre> + * </p> + * + * @param properties Producer's configuration + * @return {@link Producer} Thread safe {@link Producer} instance + */ + public static Producer createProducer(final Properties properties) { + return onsFactory.createProducer(properties); + } + + /** + * Create OrderProducer + * <p> + * <code>properties</code> + * Require: + * <ol> + * <li>{@link PropertyKeyConst#GROUP_ID}</li> + * <li>{@link PropertyKeyConst#AccessKey}</li> + * <li>{@link PropertyKeyConst#SecretKey}</li> + * <li>{@link PropertyKeyConst#ONSAddr}</li> + * </ol> + * Optional: + * <ul> + * <li>{@link PropertyKeyConst#NAMESRV_ADDR}</li> + * <li>{@link PropertyKeyConst#OnsChannel}</li> + * <li>{@link PropertyKeyConst#SendMsgTimeoutMillis}</li> + * </ul> + * </p> + * + * @param properties Producer configuration + * @return {@code OrderProducer} Thread safe {@link OrderProducer} instance + */ + public static OrderProducer createOrderProducer(final Properties properties) { + return onsFactory.createOrderProducer(properties); + } + + /** + * Create Transaction Producer + * <p> + * <code>properties</code>Requires: + * <ol> + * <li>{@link PropertyKeyConst#GROUP_ID}</li> + * <li>{@link PropertyKeyConst#AccessKey}</li> + * <li>{@link PropertyKeyConst#SecretKey}</li> + * <li>{@link PropertyKeyConst#ONSAddr}</li> + * </ol> + * Optional: + * <ul> + * <li>{@link PropertyKeyConst#NAMESRV_ADDR}</li> + * <li>{@link PropertyKeyConst#OnsChannel}</li> + * <li>{@link PropertyKeyConst#SendMsgTimeoutMillis}</li> + * <li>{@link PropertyKeyConst#CheckImmunityTimeInSeconds}</li> + * </ul> + * </p> + * + * @param properties Producer configuration + * @return {@code TransactionProducer} Thread safe {@link TransactionProducer} instance + */ + public static TransactionProducer createTransactionProducer(final Properties properties, + final LocalTransactionChecker checker) { + return onsFactory.createTransactionProducer(properties, checker); + } + + /** + * Create Consumer + * <p> + * <code>properties</code> + * Requires: + * <ol> + * <li>{@link PropertyKeyConst#GROUP_ID}</li> + * <li>{@link PropertyKeyConst#AccessKey}</li> + * <li>{@link PropertyKeyConst#SecretKey}</li> + * <li>{@link PropertyKeyConst#ONSAddr}</li> + * </ol> + * Optional: + * <ul> + * <li>{@link PropertyKeyConst#ConsumeThreadNums}</li> + * <li>{@link PropertyKeyConst#ConsumeTimeout}</li> + * <li>{@link PropertyKeyConst#OnsChannel}</li> + * </ul> + * </p> + * + * @param properties Consumer's configuration + * @return {@code Consumer} Thread safe {@link Consumer} instance + */ + public static Consumer createConsumer(final Properties properties) { + return onsFactory.createConsumer(properties); + } + + /** + * Create BatchConsumer + * <p> + * <code>properties</code> + * Requires: + * <ol> + * <li>{@link PropertyKeyConst#GROUP_ID}</li> + * <li>{@link PropertyKeyConst#AccessKey}</li> + * <li>{@link PropertyKeyConst#SecretKey}</li> + * <li>{@link PropertyKeyConst#ONSAddr}</li> + * </ol> + * Optional: + * <ul> + * <li>{@link PropertyKeyConst#ConsumeThreadNums}</li> + * <li>{@link PropertyKeyConst#ConsumeTimeout}</li> + * <li>{@link PropertyKeyConst#ConsumeMessageBatchMaxSize}</li> + * <li>{@link PropertyKeyConst#OnsChannel}</li> + * </ul> + * </p> + * + * @param properties BatchConsumer's configuration + * @return {@code BatchConsumer} Thread safe {@link BatchConsumer} instance + */ + public static BatchConsumer createBatchConsumer(final Properties properties) { + return onsFactory.createBatchConsumer(properties); + } + + /** + * Create Order Consumer + * <p> + * <code>properties</code> + * Requires: + * <ol> + * <li>{@link PropertyKeyConst#GROUP_ID}</li> + * <li>{@link PropertyKeyConst#AccessKey}</li> + * <li>{@link PropertyKeyConst#SecretKey}</li> + * <li>{@link PropertyKeyConst#ONSAddr}</li> + * </ol> + * Optional: + * <ul> + * <li>{@link PropertyKeyConst#ConsumeThreadNums}</li> + * <li>{@link PropertyKeyConst#ConsumeTimeout}</li> + * <li>{@link PropertyKeyConst#OnsChannel}</li> + * </ul> + * </p> + * + * @param properties Consumer's configuration + * @return {@code OrderConsumer} Thread safe {@link OrderConsumer} instance + */ + public static OrderConsumer createOrderedConsumer(final Properties properties) { + return onsFactory.createOrderedConsumer(properties); + } + +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactoryAPI.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactoryAPI.java new file mode 100644 index 0000000..67186b4 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/ONSFactoryAPI.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api; + +import io.openmessaging.api.Consumer; +import io.openmessaging.api.MessagingAccessPoint; +import io.openmessaging.api.Producer; +import io.openmessaging.api.batch.BatchConsumer; +import io.openmessaging.api.order.OrderConsumer; +import io.openmessaging.api.order.OrderProducer; +import io.openmessaging.api.transaction.LocalTransactionChecker; +import io.openmessaging.api.transaction.TransactionProducer; +import java.util.Properties; + +/** + * {@link MessagingAccessPoint} is recommended. + */ +@Deprecated +public interface ONSFactoryAPI { + + Producer createProducer(final Properties properties); + + + Consumer createConsumer(final Properties properties); + + + BatchConsumer createBatchConsumer(final Properties properties); + + + OrderProducer createOrderProducer(final Properties properties); + + + OrderConsumer createOrderedConsumer(final Properties properties); + + + TransactionProducer createTransactionProducer(final Properties properties, + final LocalTransactionChecker checker); +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/constant/PropertyKeyConst.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java similarity index 74% rename from ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/constant/PropertyKeyConst.java rename to ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java index ce4cf50..b3d2670 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/constant/PropertyKeyConst.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java @@ -14,23 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.ons.api.impl.constant; +package org.apache.rocketmq.ons.api; -public class PropertyKeyConst { +import io.openmessaging.api.OMSBuiltinKeys; - public static final String MessageModel = "MessageModel"; - - /** - * Deprecated, replaced with GROUP_ID - */ - @Deprecated - public static final String ProducerId = "ProducerId"; +public class PropertyKeyConst implements OMSBuiltinKeys { - /** - * Deprecated, replaced with GROUP_ID - */ - @Deprecated - public static final String ConsumerId = "ConsumerId"; + public static final String MessageModel = "MessageModel"; public static final String GROUP_ID = "GROUP_ID"; @@ -42,8 +32,10 @@ public class PropertyKeyConst { public static final String SendMsgTimeoutMillis = "SendMsgTimeoutMillis"; + @Deprecated public static final String ONSAddr = "ONSAddr"; + @Deprecated public static final String NAMESRV_ADDR = "NAMESRV_ADDR"; public static final String ConsumeThreadNums = "ConsumeThreadNums"; @@ -72,30 +64,21 @@ public class PropertyKeyConst { public static final String InstanceName = "InstanceName"; - public static final String MsgTraceSwitch = "MsgTraceSwitch"; - - public static final String MqttMessageId = "mqttMessageId"; - - public static final String MqttMessage = "mqttMessage"; - - public static final String MqttPublishRetain = "mqttRetain"; - - public static final String MqttPublishDubFlag = "mqttPublishDubFlag"; + @Deprecated + public static final String EXACTLYONCE_DELIVERY = "exactlyOnceDelivery"; - public static final String MqttSecondTopic = "mqttSecondTopic"; + public static final String QOS = "qos"; - public static final String MqttClientId = "clientId"; + public static final String EXACTLYONCE_RM_REFRESHINTERVAL = "exactlyOnceRmRefreshInterval"; - public static final String MqttQOS = "qoslevel"; + public static final String MAX_BATCH_MESSAGE_COUNT = "maxBatchMessageCount"; - public static final String INSTANCE_ID = "INSTANCE_ID"; + public static final String INSTANCE_ID = "instanceId"; - public static final String EXACTLYONCE_DELIVERY = "exactlyOnceDelivery"; + public static final String LANGUAGE_IDENTIFIER = "languageIdentifier"; - public static final String EXACTLYONCE_RM_REFRESHINTERVAL = "exactlyOnceRmRefreshInterval"; + public static final String MsgTraceSwitch = "msgTraceSwitch"; - public static final String MAX_BATCH_MESSAGE_COUNT = "maxBatchMessageCount"; - public static final String LANGUAGE_IDENTIFIER = "languageIdentifier"; } diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyValueConst.java similarity index 61% copy from ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java copy to ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyValueConst.java index 1d76cf6..c748447 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyValueConst.java @@ -14,17 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.ons.sample.producer; +package org.apache.rocketmq.ons.api; -import io.openmessaging.Message; -import io.openmessaging.transaction.LocalTransactionChecker; -import io.openmessaging.transaction.TransactionStatus; -public class LocalTransactionCheckerImpl implements LocalTransactionChecker { +public class PropertyValueConst { - @Override - public TransactionStatus check(Message msg) { - System.out.printf("Receive transaction check back request, MsgId: %s%n", msg.getMsgID()); - return TransactionStatus.CommitTransaction; - } + + public static final String BROADCASTING = "BROADCASTING"; + + + public static final String CLUSTERING = "CLUSTERING"; } diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/exception/ONSClientException.java similarity index 62% copy from ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java copy to ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/exception/ONSClientException.java index 1d76cf6..7f6daaf 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/exception/ONSClientException.java @@ -14,17 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.ons.sample.producer; +package org.apache.rocketmq.ons.api.exception; -import io.openmessaging.Message; -import io.openmessaging.transaction.LocalTransactionChecker; -import io.openmessaging.transaction.TransactionStatus; +import io.openmessaging.api.exception.OMSRuntimeException; -public class LocalTransactionCheckerImpl implements LocalTransactionChecker { +public class ONSClientException extends OMSRuntimeException { - @Override - public TransactionStatus check(Message msg) { - System.out.printf("Receive transaction check back request, MsgId: %s%n", msg.getMsgID()); - return TransactionStatus.CommitTransaction; + public ONSClientException() { + } + + public ONSClientException(Throwable cause) { + super(cause); + } + + public ONSClientException(String message) { + super(message); + } + + public ONSClientException(String message, Throwable cause) { + super(message, cause); } } diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/MQClientInfo.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/MQClientInfo.java index 96ce1ca..aaca85c 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/MQClientInfo.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/MQClientInfo.java @@ -24,14 +24,15 @@ import org.apache.rocketmq.common.MQVersion; public class MQClientInfo { public static int versionCode = MQVersion.CURRENT_VERSION; + public static String currentVersion; static { try { InputStream stream = MQClientInfo.class.getClassLoader().getResourceAsStream("ons_client_info.properties"); Properties properties = new Properties(); properties.load(stream); - String pkgVersion = String.valueOf(properties.get("version")); - versionCode = Integer.MAX_VALUE - Integer.valueOf(pkgVersion.replaceAll("[^0-9]", "")); + currentVersion = String.valueOf(properties.get("version")); + versionCode = Integer.MAX_VALUE - Integer.valueOf(currentVersion.replaceAll("[^0-9]", "")); } catch (Exception ignore) { } } diff --git a/ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/ONSFactoryImpl.java similarity index 74% copy from ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java copy to ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/ONSFactoryImpl.java index 3a0a96b..18ac9e4 100644 --- a/ons-core/ons-client/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/ONSFactoryImpl.java @@ -14,26 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.rocketmq.ons.api.impl; -package io.openmessaging.rocketmq; - -import io.openmessaging.Constants; -import io.openmessaging.Consumer; -import io.openmessaging.Message; -import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.OMSResponseStatus; -import io.openmessaging.Producer; -import io.openmessaging.PullConsumer; -import io.openmessaging.batch.BatchConsumer; -import io.openmessaging.order.OrderConsumer; -import io.openmessaging.order.OrderProducer; -import io.openmessaging.transaction.LocalTransactionChecker; -import io.openmessaging.transaction.TransactionProducer; -import io.openmessaging.transaction.TransactionStatus; +import io.openmessaging.api.Consumer; +import io.openmessaging.api.Message; +import io.openmessaging.api.OMS; +import io.openmessaging.api.Producer; +import io.openmessaging.api.batch.BatchConsumer; +import io.openmessaging.api.order.OrderConsumer; +import io.openmessaging.api.order.OrderProducer; +import io.openmessaging.api.transaction.LocalTransactionChecker; +import io.openmessaging.api.transaction.TransactionProducer; +import io.openmessaging.api.transaction.TransactionStatus; import java.util.Properties; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.ons.api.Constants; +import org.apache.rocketmq.ons.api.ONSFactoryAPI; import org.apache.rocketmq.ons.api.impl.rocketmq.BatchConsumerImpl; import org.apache.rocketmq.ons.api.impl.rocketmq.ConsumerImpl; import org.apache.rocketmq.ons.api.impl.rocketmq.ONSUtil; @@ -42,27 +40,11 @@ import org.apache.rocketmq.ons.api.impl.rocketmq.OrderProducerImpl; import org.apache.rocketmq.ons.api.impl.rocketmq.ProducerImpl; import org.apache.rocketmq.ons.api.impl.rocketmq.TransactionProducerImpl; -public class MessagingAccessPointImpl implements MessagingAccessPoint { - - private Properties attributes; - - public MessagingAccessPointImpl(Properties attributes) { - this.attributes = attributes; - } - - @Override - public String version() { - return "1.1.3"; - } - - @Override public Properties attributes() { - return null; - } - - @Override public PullConsumer createPullConsumer(Properties properties) { - throw OMSResponseStatus.generateException(OMSResponseStatus.STATUS_1101); - } - +/** + * Recommend to use {@link OMS} to create Producer or Consumer instance. + */ +@Deprecated +public class ONSFactoryImpl implements ONSFactoryAPI { @Override public Producer createProducer(final Properties properties) { return new ProducerImpl(ONSUtil.extractProperties(properties)); diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/BatchConsumerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/BatchConsumerImpl.java index 8a77c5e..81dd7ec 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/BatchConsumerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/BatchConsumerImpl.java @@ -17,14 +17,13 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; -import io.openmessaging.Action; -import io.openmessaging.Constants; -import io.openmessaging.ConsumeContext; -import io.openmessaging.Message; -import io.openmessaging.PropertyValueConst; -import io.openmessaging.batch.BatchConsumer; -import io.openmessaging.batch.BatchMessageListener; -import io.openmessaging.exception.OMSRuntimeException; + +import io.openmessaging.api.Action; +import io.openmessaging.api.ConsumeContext; +import io.openmessaging.api.Message; + +import io.openmessaging.api.batch.BatchConsumer; +import io.openmessaging.api.batch.BatchMessageListener; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -39,7 +38,10 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; +import org.apache.rocketmq.ons.api.Constants; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.PropertyValueConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; @Generated("ons-client") public class BatchConsumerImpl extends ONSConsumerAbstract implements BatchConsumer { @@ -73,11 +75,11 @@ public class BatchConsumerImpl extends ONSConsumerAbstract implements BatchConsu @Override public void subscribe(String topic, String subExpression, BatchMessageListener listener) { if (null == topic) { - throw new OMSRuntimeException("topic is null"); + throw new ONSClientException("topic is null"); } if (null == listener) { - throw new OMSRuntimeException("listener is null"); + throw new ONSClientException("listener is null"); } this.subscribeTable.put(topic, listener); super.subscribe(topic, subExpression); @@ -109,7 +111,7 @@ public class BatchConsumerImpl extends ONSConsumerAbstract implements BatchConsu BatchMessageListener listener = BatchConsumerImpl.this.subscribeTable.get(msgList.get(0).getTopic()); if (null == listener) { - throw new OMSRuntimeException("BatchMessageListener is null"); + throw new ONSClientException("BatchMessageListener is null"); } final ConsumeContext context = new ConsumeContext(); diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ConsumerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ConsumerImpl.java index 0c5c696..8a45880 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ConsumerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ConsumerImpl.java @@ -17,19 +17,16 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; -import io.openmessaging.Action; -import io.openmessaging.Constants; -import io.openmessaging.ConsumeContext; -import io.openmessaging.Consumer; -import io.openmessaging.Message; -import io.openmessaging.MessageListener; -import io.openmessaging.MessageSelector; -import io.openmessaging.PropertyValueConst; -import io.openmessaging.exception.OMSRuntimeException; + +import io.openmessaging.api.Action; +import io.openmessaging.api.ConsumeContext; +import io.openmessaging.api.Consumer; +import io.openmessaging.api.Message; +import io.openmessaging.api.MessageListener; +import io.openmessaging.api.MessageSelector; import java.util.List; import java.util.Map; import java.util.Properties; - import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Generated; @@ -38,7 +35,10 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; +import org.apache.rocketmq.ons.api.Constants; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.PropertyValueConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; @Generated("ons-client") public class ConsumerImpl extends ONSConsumerAbstract implements Consumer { @@ -63,11 +63,11 @@ public class ConsumerImpl extends ONSConsumerAbstract implements Consumer { @Override public void subscribe(String topic, String subExpression, MessageListener listener) { if (null == topic) { - throw new OMSRuntimeException("topic is null"); + throw new ONSClientException("topic is null"); } if (null == listener) { - throw new OMSRuntimeException("listener is null"); + throw new ONSClientException("listener is null"); } this.subscribeTable.put(topic, listener); super.subscribe(topic, subExpression); @@ -76,11 +76,11 @@ public class ConsumerImpl extends ONSConsumerAbstract implements Consumer { @Override public void subscribe(final String topic, final MessageSelector selector, final MessageListener listener) { if (null == topic) { - throw new OMSRuntimeException("topic is null"); + throw new ONSClientException("topic is null"); } if (null == listener) { - throw new OMSRuntimeException("listener is null"); + throw new ONSClientException("listener is null"); } this.subscribeTable.put(topic, listener); super.subscribe(topic, selector); @@ -110,7 +110,7 @@ public class ConsumerImpl extends ONSConsumerAbstract implements Consumer { } MessageListener listener = ConsumerImpl.this.subscribeTable.get(msg.getTopic()); if (null == listener) { - throw new OMSRuntimeException("MessageListener is null"); + throw new ONSClientException("MessageListener is null"); } final ConsumeContext context = new ConsumeContext(); diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java index 1471c56..6b78740 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java @@ -17,9 +17,9 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; -import io.openmessaging.Credentials; -import io.openmessaging.LifeCycle; -import io.openmessaging.exception.OMSRuntimeException; + +import io.openmessaging.api.Credentials; +import io.openmessaging.api.LifeCycle; import java.util.Properties; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -33,8 +33,9 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; import org.apache.rocketmq.ons.api.impl.util.NameAddrUtils; import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher; @@ -75,16 +76,16 @@ public abstract class ONSClientAbstract implements LifeCycle, Credentials { this.sessionCredentials.updateContent(properties); if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) && (null == this.sessionCredentials.getAccessKey() || "".equals(this.sessionCredentials.getAccessKey()))) { - throw new OMSRuntimeException("please set access key"); + throw new ONSClientException("please set access key"); } if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) && (null == this.sessionCredentials.getSecretKey() || "".equals(this.sessionCredentials.getSecretKey()))) { - throw new OMSRuntimeException("please set secret key"); + throw new ONSClientException("please set secret key"); } if (null == this.sessionCredentials.getOnsChannel()) { - throw new OMSRuntimeException("please set ons channel"); + throw new ONSClientException("please set ons channel"); } this.nameServerAddr = getNameSrvAddrFromProperties(); @@ -96,7 +97,7 @@ public abstract class ONSClientAbstract implements LifeCycle, Credentials { } this.nameServerAddr = fetchNameServerAddr(); if (null == nameServerAddr) { - throw new OMSRuntimeException(FAQ.errorMessage("Can not find name server, May be your network problem.", FAQ.FIND_NS_FAILED)); + throw new ONSClientException(FAQ.errorMessage("Can not find name server, May be your network problem.", FAQ.FIND_NS_FAILED)); } this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @@ -140,7 +141,7 @@ public abstract class ONSClientAbstract implements LifeCycle, Credentials { LOGGER.info("connected to user-defined ons addr server, {} success, {}", property, nsAddrs); return nsAddrs; } else { - throw new OMSRuntimeException(FAQ.errorMessage("Can not find name server with onsAddr " + property, FAQ.FIND_NS_FAILED)); + throw new ONSClientException(FAQ.errorMessage("Can not find name server with onsAddr " + property, FAQ.FIND_NS_FAILED)); } } } @@ -209,14 +210,14 @@ public abstract class ONSClientAbstract implements LifeCycle, Credentials { protected void checkONSProducerServiceState(DefaultMQProducerImpl producer) { switch (producer.getServiceState()) { case CREATE_JUST: - throw new OMSRuntimeException( + throw new ONSClientException( FAQ.errorMessage(String.format("You do not have start the producer[" + getPid() + "], %s", producer.getServiceState()), FAQ.SERVICE_STATE_WRONG)); case SHUTDOWN_ALREADY: - throw new OMSRuntimeException(FAQ.errorMessage(String.format("Your producer has been shut down, %s", producer.getServiceState()), + throw new ONSClientException(FAQ.errorMessage(String.format("Your producer has been shut down, %s", producer.getServiceState()), FAQ.SERVICE_STATE_WRONG)); case START_FAILED: - throw new OMSRuntimeException(FAQ.errorMessage( + throw new ONSClientException(FAQ.errorMessage( String.format("When you start your service throws an exception, %s", producer.getServiceState()), FAQ.SERVICE_STATE_WRONG)); case RUNNING: break; @@ -241,13 +242,13 @@ public abstract class ONSClientAbstract implements LifeCycle, Credentials { if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) && (null == credentialProperties.getProperty(SessionCredentials.AccessKey) || "".equals(credentialProperties.getProperty(SessionCredentials.AccessKey)))) { - throw new OMSRuntimeException("update credential failed. please set access key."); + throw new ONSClientException("update credential failed. please set access key."); } if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) && (null == credentialProperties.getProperty(SessionCredentials.SecretKey) || "".equals(credentialProperties.getProperty(SessionCredentials.SecretKey)))) { - throw new OMSRuntimeException("update credential failed. please set secret key"); + throw new ONSClientException("update credential failed. please set secret key"); } this.sessionCredentials.updateContent(credentialProperties); } diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java index f9488d5..d82feeb 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java @@ -17,15 +17,16 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; -import io.openmessaging.MessageSelector; -import io.openmessaging.exception.OMSRuntimeException; + +import io.openmessaging.api.MessageSelector; import java.util.Properties; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; import org.apache.rocketmq.ons.api.impl.tracehook.OnsConsumeMessageHookImpl; import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; @@ -48,9 +49,9 @@ public class ONSConsumerAbstract extends ONSClientAbstract { public ONSConsumerAbstract(final Properties properties) { super(properties); - String consumerGroup = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.ConsumerId)); + String consumerGroup = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.GROUP_ID)); if (StringUtils.isEmpty(consumerGroup)) { - throw new OMSRuntimeException("ConsumerId property is null"); + throw new ONSClientException("ConsumerId property is null"); } this.defaultMQPushConsumer = @@ -144,7 +145,7 @@ public class ONSConsumerAbstract extends ONSClientAbstract { try { this.defaultMQPushConsumer.subscribe(topic, subExpression); } catch (MQClientException e) { - throw new OMSRuntimeException("defaultMQPushConsumer subscribe exception", e); + throw new ONSClientException("defaultMQPushConsumer subscribe exception", e); } } @@ -153,7 +154,7 @@ public class ONSConsumerAbstract extends ONSClientAbstract { String type = org.apache.rocketmq.common.filter.ExpressionType.TAG; if (selector != null) { if (selector.getType() == null) { - throw new OMSRuntimeException("Expression type is null!"); + throw new ONSClientException("Expression type is null!"); } subExpression = selector.getSubExpression(); type = selector.getType().name(); @@ -165,13 +166,13 @@ public class ONSConsumerAbstract extends ONSClientAbstract { } else if (org.apache.rocketmq.common.filter.ExpressionType.TAG.equals(type)) { messageSelector = org.apache.rocketmq.client.consumer.MessageSelector.byTag(subExpression); } else { - throw new OMSRuntimeException(String.format("Expression type %s is unknown!", type)); + throw new ONSClientException(String.format("Expression type %s is unknown!", type)); } try { this.defaultMQPushConsumer.subscribe(topic, messageSelector); } catch (MQClientException e) { - throw new OMSRuntimeException("Consumer subscribe exception", e); + throw new ONSClientException("Consumer subscribe exception", e); } } @@ -187,7 +188,7 @@ public class ONSConsumerAbstract extends ONSClientAbstract { super.start(); } } catch (Exception e) { - throw new OMSRuntimeException(e.getMessage()); + throw new ONSClientException(e.getMessage()); } } diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSUtil.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSUtil.java index c90d9c3..14c581c 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSUtil.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSUtil.java @@ -17,9 +17,9 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; -import io.openmessaging.Message; -import io.openmessaging.MessageAccessor; -import io.openmessaging.exception.OMSRuntimeException; + +import io.openmessaging.api.Message; +import io.openmessaging.api.MessageAccessor; import java.lang.reflect.Field; import java.util.HashSet; import java.util.Iterator; @@ -29,6 +29,7 @@ import java.util.Properties; import java.util.Set; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.ons.api.exception.ONSClientException; public class ONSUtil { private static final Set<String> RESERVED_KEY_SET_RMQ = new HashSet<String>(); @@ -68,7 +69,7 @@ public class ONSUtil { public static org.apache.rocketmq.common.message.Message msgConvert(Message message) { org.apache.rocketmq.common.message.Message msgRMQ = new org.apache.rocketmq.common.message.Message(); if (message == null) { - throw new OMSRuntimeException("\'message\' is null"); + throw new ONSClientException("\'message\' is null"); } if (message.getTopic() != null) { diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderConsumerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderConsumerImpl.java index d9c9b8a..07dc23c 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderConsumerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderConsumerImpl.java @@ -17,13 +17,13 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; -import io.openmessaging.Message; -import io.openmessaging.MessageSelector; -import io.openmessaging.exception.OMSRuntimeException; -import io.openmessaging.order.ConsumeOrderContext; -import io.openmessaging.order.MessageOrderListener; -import io.openmessaging.order.OrderAction; -import io.openmessaging.order.OrderConsumer; + +import io.openmessaging.api.Message; +import io.openmessaging.api.MessageSelector; +import io.openmessaging.api.order.ConsumeOrderContext; +import io.openmessaging.api.order.MessageOrderListener; +import io.openmessaging.api.order.OrderAction; +import io.openmessaging.api.order.OrderConsumer; import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -33,7 +33,8 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; public class OrderConsumerImpl extends ONSConsumerAbstract implements OrderConsumer { private final ConcurrentHashMap<String, MessageOrderListener> subscribeTable = new ConcurrentHashMap<String, MessageOrderListener>(); @@ -58,11 +59,11 @@ public class OrderConsumerImpl extends ONSConsumerAbstract implements OrderConsu @Override public void subscribe(String topic, String subExpression, MessageOrderListener listener) { if (null == topic) { - throw new OMSRuntimeException("topic is null"); + throw new ONSClientException("topic is null"); } if (null == listener) { - throw new OMSRuntimeException("listener is null"); + throw new ONSClientException("listener is null"); } this.subscribeTable.put(topic, listener); super.subscribe(topic, subExpression); @@ -71,11 +72,11 @@ public class OrderConsumerImpl extends ONSConsumerAbstract implements OrderConsu @Override public void subscribe(final String topic, final MessageSelector selector, final MessageOrderListener listener) { if (null == topic) { - throw new OMSRuntimeException("topic is null"); + throw new ONSClientException("topic is null"); } if (null == listener) { - throw new OMSRuntimeException("listener is null"); + throw new ONSClientException("listener is null"); } this.subscribeTable.put(topic, listener); super.subscribe(topic, selector); @@ -91,7 +92,7 @@ public class OrderConsumerImpl extends ONSConsumerAbstract implements OrderConsu MessageOrderListener listener = OrderConsumerImpl.this.subscribeTable.get(msg.getTopic()); if (null == listener) { - throw new OMSRuntimeException("MessageOrderListener is null"); + throw new ONSClientException("MessageOrderListener is null"); } final ConsumeOrderContext context = new ConsumeOrderContext(); diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java index d46b769..c840bb2 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java @@ -17,10 +17,10 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; -import io.openmessaging.Message; -import io.openmessaging.SendResult; -import io.openmessaging.exception.OMSRuntimeException; -import io.openmessaging.order.OrderProducer; + +import io.openmessaging.api.Message; +import io.openmessaging.api.SendResult; +import io.openmessaging.api.order.OrderProducer; import java.util.List; import java.util.Properties; import org.apache.commons.lang3.StringUtils; @@ -29,7 +29,8 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl; import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; @@ -43,7 +44,7 @@ public class OrderProducerImpl extends ONSClientAbstract implements OrderProduce public OrderProducerImpl(final Properties properties) { super(properties); - String producerGroup = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.ProducerId)); + String producerGroup = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.GROUP_ID)); if (StringUtils.isEmpty(producerGroup)) { producerGroup = "__ONS_PRODUCER_DEFAULT_GROUP"; } @@ -109,7 +110,7 @@ public class OrderProducerImpl extends ONSClientAbstract implements OrderProduce super.start(); } } catch (Exception e) { - throw new OMSRuntimeException(e.getMessage()); + throw new ONSClientException(e.getMessage()); } } @@ -124,7 +125,7 @@ public class OrderProducerImpl extends ONSClientAbstract implements OrderProduce @Override public SendResult send(final Message message, final String shardingKey) { if (UtilAll.isBlank(shardingKey)) { - throw new OMSRuntimeException("\'shardingKey\' is blank."); + throw new ONSClientException("\'shardingKey\' is blank."); } message.setShardingKey(shardingKey); this.checkONSProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl()); @@ -148,7 +149,7 @@ public class OrderProducerImpl extends ONSClientAbstract implements OrderProduce sendResult.setMessageId(sendResultRMQ.getMsgId()); return sendResult; } catch (Exception e) { - throw new OMSRuntimeException("defaultMQProducer send order exception", e); + throw new ONSClientException("defaultMQProducer send order exception", e); } } } diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java index d40b785..cccdf05 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java @@ -17,19 +17,14 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; -import io.openmessaging.Message; -import io.openmessaging.OnExceptionContext; -import io.openmessaging.Producer; -import io.openmessaging.SendCallback; -import io.openmessaging.SendResult; -import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.api.Message; +import io.openmessaging.api.OnExceptionContext; +import io.openmessaging.api.Producer; +import io.openmessaging.api.SendCallback; +import io.openmessaging.api.SendResult; import java.util.Properties; import java.util.concurrent.ExecutorService; - -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; -import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; -import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType; -import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -37,13 +32,15 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; - - +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl; import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; -import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; +import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType; +import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.LanguageCode; public class ProducerImpl extends ONSClientAbstract implements Producer { @@ -53,7 +50,7 @@ public class ProducerImpl extends ONSClientAbstract implements Producer { public ProducerImpl(final Properties properties) { super(properties); - String producerGroup = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.ProducerId)); + String producerGroup = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.GROUP_ID)); if (StringUtils.isEmpty(producerGroup)) { producerGroup = "__ONS_PRODUCER_DEFAULT_GROUP"; } @@ -123,7 +120,7 @@ public class ProducerImpl extends ONSClientAbstract implements Producer { super.start(); } } catch (Exception e) { - throw new OMSRuntimeException(e.getMessage()); + throw new ONSClientException(e.getMessage()); } } @@ -199,10 +196,10 @@ public class ProducerImpl extends ONSClientAbstract implements Producer { @Override public void onException(Throwable e) { - //String topic = new String(message.getTopic()); - //String msgId = new String(message.getMsgID()); + String topic = new String(message.getTopic()); + String msgId = new String(message.getMsgID()); LOGGER.error(String.format("Send message async Exception, %s", message), e); - OMSRuntimeException onsEx = checkProducerException(message.getTopic(), message.getMsgID(), e); + ONSClientException onsEx = checkProducerException(topic, msgId, e); OnExceptionContext context = new OnExceptionContext(); context.setTopic(message.getTopic()); context.setMessageId(message.getMsgID()); @@ -221,39 +218,34 @@ public class ProducerImpl extends ONSClientAbstract implements Producer { return sendResult; } - @Override - public SendResult send(Message message, String shardingKey) { - return null; - } - - private OMSRuntimeException checkProducerException(String topic, String msgId, Throwable e) { + private ONSClientException checkProducerException(String topic, String msgId, Throwable e) { if (e instanceof MQClientException) { if (e.getCause() != null) { if (e.getCause() instanceof RemotingConnectException) { - return new OMSRuntimeException( + return new ONSClientException( FAQ.errorMessage(String.format("Connect broker failed, Topic=%s, msgId=%s", topic, msgId), FAQ.CONNECT_BROKER_FAILED)); } else if (e.getCause() instanceof RemotingTimeoutException) { - return new OMSRuntimeException(FAQ.errorMessage(String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s", + return new ONSClientException(FAQ.errorMessage(String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s", this.defaultMQProducer.getSendMsgTimeout(), topic, msgId), FAQ.SEND_MSG_TO_BROKER_TIMEOUT)); } else if (e.getCause() instanceof MQBrokerException) { MQBrokerException excep = (MQBrokerException) e.getCause(); - return new OMSRuntimeException(FAQ.errorMessage( + return new ONSClientException(FAQ.errorMessage( String.format("Receive a broker exception, Topic=%s, msgId=%s, %s", topic, msgId, excep.getErrorMessage()), FAQ.BROKER_RESPONSE_EXCEPTION)); } } else { MQClientException excep = (MQClientException) e; if (-1 == excep.getResponseCode()) { - return new OMSRuntimeException( + return new ONSClientException( FAQ.errorMessage(String.format("Topic does not exist, Topic=%s, msgId=%s", topic, msgId), FAQ.TOPIC_ROUTE_NOT_EXIST)); } else if (ResponseCode.MESSAGE_ILLEGAL == excep.getResponseCode()) { - return new OMSRuntimeException( + return new ONSClientException( FAQ.errorMessage(String.format("ONS Client check message exception, Topic=%s, msgId=%s", topic, msgId), FAQ.CLIENT_CHECK_MSG_EXCEPTION)); } } } - return new OMSRuntimeException("defaultMQProducer send exception", e); + return new ONSClientException("defaultMQProducer send exception", e); } } diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java index 3c44f14..66b3014 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java @@ -17,12 +17,12 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; -import io.openmessaging.Constants; -import io.openmessaging.Message; -import io.openmessaging.SendResult; -import io.openmessaging.transaction.LocalTransactionExecutor; -import io.openmessaging.transaction.TransactionProducer; -import io.openmessaging.transaction.TransactionStatus; + +import io.openmessaging.api.Message; +import io.openmessaging.api.SendResult; +import io.openmessaging.api.transaction.LocalTransactionExecuter; +import io.openmessaging.api.transaction.TransactionProducer; +import io.openmessaging.api.transaction.TransactionStatus; import java.util.Properties; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.exception.MQClientException; @@ -31,7 +31,8 @@ import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; +import org.apache.rocketmq.ons.api.Constants; +import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl; import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; @@ -47,7 +48,7 @@ public class TransactionProducerImpl extends ONSClientAbstract implements Transa public TransactionProducerImpl(Properties properties, TransactionCheckListener transactionCheckListener) { super(properties); this.properties = properties; - String producerGroup = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.ProducerId)); + String producerGroup = properties.getProperty(PropertyKeyConst.GROUP_ID, properties.getProperty(PropertyKeyConst.GROUP_ID)); if (StringUtils.isEmpty(producerGroup)) { producerGroup = "__ONS_PRODUCER_DEFAULT_GROUP"; } @@ -123,7 +124,7 @@ public class TransactionProducerImpl extends ONSClientAbstract implements Transa } @Override - public SendResult send(final Message message, final LocalTransactionExecutor executer, Object arg) { + public SendResult send(final Message message, final LocalTransactionExecuter executer, Object arg) { this.checkONSProducerServiceState(this.transactionMQProducer.getDefaultMQProducerImpl()); org.apache.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message); org.apache.rocketmq.client.producer.TransactionSendResult sendResultRMQ = null; diff --git a/ons-core/ons-client/src/main/resources/META-INF/services/io.openmessaging.api.ONSFactoryAPI b/ons-core/ons-client/src/main/resources/META-INF/services/io.openmessaging.api.ONSFactoryAPI new file mode 100644 index 0000000..057a032 --- /dev/null +++ b/ons-core/ons-client/src/main/resources/META-INF/services/io.openmessaging.api.ONSFactoryAPI @@ -0,0 +1 @@ +org.apache.rocketmq.ons.api.impl.ONSFactoryImpl \ No newline at end of file diff --git a/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/NameServerAutoUpdateTest.java b/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/NameServerAutoUpdateTest.java index 52e7d2d..3d94d74 100644 --- a/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/NameServerAutoUpdateTest.java +++ b/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/NameServerAutoUpdateTest.java @@ -17,14 +17,15 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; -import io.openmessaging.Consumer; -import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.OMS; -import io.openmessaging.Producer; -import io.openmessaging.exception.OMSRuntimeException; -import io.openmessaging.order.OrderProducer; + +import io.openmessaging.api.Consumer; +import io.openmessaging.api.MessagingAccessPoint; +import io.openmessaging.api.OMS; +import io.openmessaging.api.Producer; +import io.openmessaging.api.order.OrderProducer; import java.util.Properties; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; import org.junit.Before; import org.junit.Rule; import org.junit.rules.ExpectedException; @@ -51,7 +52,7 @@ public class NameServerAutoUpdateTest { @org.junit.Test public void testNamesrv_setOnsAddr_invalid() { - expectedException.expect(OMSRuntimeException.class); + expectedException.expect(ONSClientException.class); expectedException.expectMessage("onsAddr " + "xxx"); Properties prop = buildProps(); diff --git a/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientTokenUpdateTest.java b/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientTokenUpdateTest.java index 8a7386d..f97d5d7 100644 --- a/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientTokenUpdateTest.java +++ b/ons-core/ons-client/src/test/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientTokenUpdateTest.java @@ -17,20 +17,20 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; -import io.openmessaging.Consumer; -import io.openmessaging.Message; -import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.OMS; -import io.openmessaging.OMSBuiltinKeys; -import io.openmessaging.Producer; -import io.openmessaging.SendResult; -import io.openmessaging.exception.OMSRuntimeException; + +import io.openmessaging.api.Consumer; +import io.openmessaging.api.Message; +import io.openmessaging.api.MessagingAccessPoint; +import io.openmessaging.api.OMS; +import io.openmessaging.api.Producer; +import io.openmessaging.api.SendResult; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.junit.Assert; import org.junit.Before; @@ -176,7 +176,7 @@ public class ONSClientTokenUpdateTest { try { consumer.updateCredential(buildProps("nak", "", "ntoken", ONSChannel.CLOUD.name())); } catch (Exception e) { - Assert.assertTrue(e instanceof OMSRuntimeException); + Assert.assertTrue(e instanceof ONSClientException); } Assert.assertEquals("ak", credentials.getAccessKey()); @@ -187,8 +187,7 @@ public class ONSClientTokenUpdateTest { private static Properties buildProps(String ak, String sk, String token, String channel) { Properties properties = new Properties(); - properties.put(PropertyKeyConst.ConsumerId, "CID_STS_TEST_MOLING"); - properties.put(PropertyKeyConst.ProducerId, "PID_STS_TEST_MOLING"); + properties.put(PropertyKeyConst.GROUP_ID, "CID_STS_TEST_MOLING"); properties.put(PropertyKeyConst.AccessKey, ak); properties.put(PropertyKeyConst.SecretKey, sk); properties.put(PropertyKeyConst.SecurityToken, token); diff --git a/ons-core/ons-trace-core/pom.xml b/ons-core/ons-trace-core/pom.xml index 579e842..422b192 100644 --- a/ons-core/ons-trace-core/pom.xml +++ b/ons-core/ons-trace-core/pom.xml @@ -21,7 +21,7 @@ <parent> <artifactId>ons-all</artifactId> <groupId>org.apache.rocketmq</groupId> - <version>1.0.1-SNAPSHOT</version> + <version>1.2.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>ons-trace-core</artifactId> diff --git a/ons-core/pom.xml b/ons-core/pom.xml index f93ca53..7a3da6a 100644 --- a/ons-core/pom.xml +++ b/ons-core/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.rocketmq</groupId> <artifactId>ons-parent</artifactId> - <version>1.0.1-SNAPSHOT</version> + <version>1.2.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <inceptionYear>2012</inceptionYear> diff --git a/ons-sample/pom.xml b/ons-sample/pom.xml index 7846256..271f7bb 100644 --- a/ons-sample/pom.xml +++ b/ons-sample/pom.xml @@ -19,7 +19,7 @@ <parent> <artifactId>ons-parent</artifactId> <groupId>org.apache.rocketmq</groupId> - <version>1.0.1-SNAPSHOT</version> + <version>1.2.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> @@ -36,7 +36,7 @@ <dependency> <groupId>io.openmessaging</groupId> <artifactId>openmessaging-api</artifactId> - <version>1.1.0-SNAPSHOT</version> + <version>1.2.0-SNAPSHOT</version> </dependency> </dependencies> diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java index 2ef0e44..8a59dc3 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java @@ -20,7 +20,7 @@ public class MQConfig { /** * RocketMQ config */ - public static final String TOPIC = "xxxx"; + public static final String TOPIC = "TopicTest"; public static final String GROUP_ID = "GID-xxxx"; public static final String ORDER_TOPIC = "xxxx"; public static final String ORDER_GROUP_ID = "GID-xxxx"; @@ -31,6 +31,6 @@ public class MQConfig { /** * NAMESRV_ADDR */ - public static final String NAMESRV_ADDR = "xxxxxx"; + public static final String NAMESRV_ADDR = "47.107.167.190:9876"; } diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/MessageListenerImpl.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/MessageListenerImpl.java index 90c3f4b..5ed0a48 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/MessageListenerImpl.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/MessageListenerImpl.java @@ -16,10 +16,10 @@ */ package org.apache.rocketmq.ons.sample.consumer; -import io.openmessaging.Action; -import io.openmessaging.ConsumeContext; -import io.openmessaging.Message; -import io.openmessaging.MessageListener; +import io.openmessaging.api.Action; +import io.openmessaging.api.ConsumeContext; +import io.openmessaging.api.Message; +import io.openmessaging.api.MessageListener; public class MessageListenerImpl implements MessageListener { @Override diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java index 513a67b..34fe915 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java @@ -16,11 +16,12 @@ */ package org.apache.rocketmq.ons.sample.consumer; -import io.openmessaging.Consumer; -import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.OMS; + +import io.openmessaging.api.Consumer; +import io.openmessaging.api.MessagingAccessPoint; +import io.openmessaging.api.OMS; import java.util.Properties; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; +import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.sample.MQConfig; public class SimpleMQConsumer { diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java index 35a9458..d3aaa07 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java @@ -16,15 +16,16 @@ */ package org.apache.rocketmq.ons.sample.consumer; -import io.openmessaging.Message; -import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.OMS; -import io.openmessaging.order.ConsumeOrderContext; -import io.openmessaging.order.MessageOrderListener; -import io.openmessaging.order.OrderAction; -import io.openmessaging.order.OrderConsumer; + +import io.openmessaging.api.Message; +import io.openmessaging.api.MessagingAccessPoint; +import io.openmessaging.api.OMS; +import io.openmessaging.api.order.ConsumeOrderContext; +import io.openmessaging.api.order.MessageOrderListener; +import io.openmessaging.api.order.OrderAction; +import io.openmessaging.api.order.OrderConsumer; import java.util.Properties; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; +import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.sample.MQConfig; public class SimpleOrderConsumer { diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java index 1d76cf6..973dc4d 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/LocalTransactionCheckerImpl.java @@ -16,9 +16,9 @@ */ package org.apache.rocketmq.ons.sample.producer; -import io.openmessaging.Message; -import io.openmessaging.transaction.LocalTransactionChecker; -import io.openmessaging.transaction.TransactionStatus; +import io.openmessaging.api.Message; +import io.openmessaging.api.transaction.LocalTransactionChecker; +import io.openmessaging.api.transaction.TransactionStatus; public class LocalTransactionCheckerImpl implements LocalTransactionChecker { diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java index aa9f7f2..7cc5682 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java @@ -16,25 +16,25 @@ */ package org.apache.rocketmq.ons.sample.producer; -import io.openmessaging.Message; -import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.OMS; -import io.openmessaging.Producer; -import io.openmessaging.SendResult; -import io.openmessaging.exception.OMSRuntimeException; + +import io.openmessaging.api.Message; +import io.openmessaging.api.MessagingAccessPoint; +import io.openmessaging.api.OMS; +import io.openmessaging.api.Producer; +import io.openmessaging.api.SendResult; +import io.openmessaging.api.exception.OMSRuntimeException; import java.util.Properties; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; +import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.sample.MQConfig; public class MQTimerProducer { public static void main(String[] args) { - MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://[email protected]/us-east"); + MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://47.107.167.190:9876"); Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID); producerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); producerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); - producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); Producer producer = messagingAccessPoint.createProducer(producerProperties); producer.start(); System.out.printf("Producer Started. %n"); diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java index 8d25ef4..2b53968 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java @@ -16,28 +16,32 @@ */ package org.apache.rocketmq.ons.sample.producer; -import io.openmessaging.Message; -import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.OMS; -import io.openmessaging.Producer; -import io.openmessaging.SendResult; -import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.api.Message; +import io.openmessaging.api.Producer; +import io.openmessaging.api.SendResult; +import io.openmessaging.api.exception.OMSRuntimeException; import java.util.Properties; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.sample.MQConfig; + +// io.openmessaging.api.xxx => com.aliyun.openservices.ons.api.xxxx + + + public class SimpleMQProducer { public static void main(String[] args) { - MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://[email protected]/us-east"); Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID); producerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); producerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); - Producer producer = messagingAccessPoint.createProducer(producerProperties); + Producer producer = ONSFactory.createProducer(producerProperties); + producer.start(); System.out.printf("Producer Started %n"); diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java index 82282aa..2d49a48 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java @@ -16,20 +16,21 @@ */ package org.apache.rocketmq.ons.sample.producer; -import io.openmessaging.Message; -import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.OMS; -import io.openmessaging.SendResult; -import io.openmessaging.exception.OMSRuntimeException; -import io.openmessaging.order.OrderProducer; + +import io.openmessaging.api.Message; +import io.openmessaging.api.MessagingAccessPoint; +import io.openmessaging.api.OMS; +import io.openmessaging.api.SendResult; +import io.openmessaging.api.exception.OMSRuntimeException; +import io.openmessaging.api.order.OrderProducer; import java.util.Properties; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; +import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.sample.MQConfig; public class SimpleOrderProducer { public static void main(String[] args) { - MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://[email protected]/us-east"); + MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://[email protected]/us-east"); Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.ORDER_GROUP_ID); diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java index c8790b9..e5834f7 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java @@ -16,17 +16,19 @@ */ package org.apache.rocketmq.ons.sample.producer; -import io.openmessaging.Message; -import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.OMS; -import io.openmessaging.SendResult; -import io.openmessaging.exception.OMSRuntimeException; -import io.openmessaging.transaction.LocalTransactionExecutor; -import io.openmessaging.transaction.TransactionProducer; -import io.openmessaging.transaction.TransactionStatus; + +import io.openmessaging.api.Message; +import io.openmessaging.api.MessagingAccessPoint; +import io.openmessaging.api.OMS; +import io.openmessaging.api.SendResult; +import io.openmessaging.api.exception.OMSRuntimeException; +import io.openmessaging.api.transaction.LocalTransactionExecuter; +import io.openmessaging.api.transaction.TransactionProducer; +import io.openmessaging.api.transaction.TransactionStatus; import java.util.Date; import java.util.Properties; -import org.apache.rocketmq.ons.api.impl.constant.PropertyKeyConst; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; import org.apache.rocketmq.ons.sample.MQConfig; public class SimpleTransactionProducer { @@ -47,7 +49,7 @@ public class SimpleTransactionProducer { for (int i = 0; i < 10; i++) { try { - SendResult sendResult = transactionProducer.send(message, new LocalTransactionExecutor() { + SendResult sendResult = transactionProducer.send(message, new LocalTransactionExecuter() { @Override public TransactionStatus execute(Message msg, Object arg) { System.out.printf("Execute local transaction and return TransactionStatus. %n"); @@ -55,7 +57,7 @@ public class SimpleTransactionProducer { } }, null); assert sendResult != null; - } catch (OMSRuntimeException e) { + } catch (ONSClientException e) { System.out.printf(new Date() + " Send mq message failed! Topic is: %s%n", MQConfig.TOPIC); e.printStackTrace(); } diff --git a/pom.xml b/pom.xml index ec1c4fa..71433c0 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ <modelVersion>4.0.0</modelVersion> <groupId>org.apache.rocketmq</groupId> <artifactId>ons-parent</artifactId> - <version>1.0.1-SNAPSHOT</version> + <version>1.2.0-SNAPSHOT</version> <packaging>pom</packaging> <name>ons-parent ${project.version}</name> <description>
