This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch OpenMessaging in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git
commit efcc721462d78a6ce3633d1e0a4869c4111b429a Author: 翊名 <[email protected]> AuthorDate: Tue Nov 19 20:17:30 2019 +0800 feat(spring) add spring bean support --- .../rocketmq/ons/api/bean/BatchConsumerBean.java | 130 ++++++++++++++++ .../apache/rocketmq/ons/api/bean/ConsumerBean.java | 173 +++++++++++++++++++++ .../rocketmq/ons/api/bean/OrderConsumerBean.java | 131 ++++++++++++++++ .../rocketmq/ons/api/bean/OrderProducerBean.java | 93 +++++++++++ .../apache/rocketmq/ons/api/bean/ProducerBean.java | 108 +++++++++++++ .../ons/api/bean/TransactionProducerBean.java | 111 +++++++++++++ .../org/apache/rocketmq/ons/sample/MQConfig.java | 2 +- .../ons/sample/consumer/SimpleMQConsumer.java | 14 +- .../ons/sample/consumer/SimpleOrderConsumer.java | 16 +- .../ons/sample/producer/MQTimerProducer.java | 13 +- .../ons/sample/producer/SimpleMQProducer.java | 23 ++- .../ons/sample/producer/SimpleOrderProducer.java | 17 +- .../sample/producer/SimpleTransactionProducer.java | 16 +- 13 files changed, 826 insertions(+), 21 deletions(-) diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/BatchConsumerBean.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/BatchConsumerBean.java new file mode 100644 index 0000000..34017fe --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/BatchConsumerBean.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api.bean; + +import io.openmessaging.api.batch.BatchConsumer; +import io.openmessaging.api.batch.BatchMessageListener; +import io.openmessaging.api.bean.Subscription; +import java.util.Map; +import java.util.Properties; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; + +/** + * {@code BatchConsumerBean} Used to integrate {@link BatchConsumer} into Spring. + */ +public class BatchConsumerBean implements BatchConsumer { + /** + * Need to inject this field, specify the properties of the construct {@code BatchConsumer} instance, see the + * specific supported properties{@link PropertyKeyConst} + * + * @see BatchConsumerBean#setProperties(Properties) + */ + private Properties properties; + + /** + * By injecting this field, complete the Topic subscription when starting {@code BatchConsumer} + * + * @see BatchConsumerBean#setSubscriptionTable(Map) + */ + private Map<Subscription, BatchMessageListener> subscriptionTable; + + private BatchConsumer batchConsumer; + + @Override + public boolean isStarted() { + return this.batchConsumer.isStarted(); + } + + @Override + public boolean isClosed() { + return this.batchConsumer.isClosed(); + } + + /** + * Start the {@code BatchConsumer} instance, it is recommended to configure the init-method of the bean. + */ + @Override + public void start() { + if (null == this.properties) { + throw new ONSClientException("properties not set"); + } + + if (null == this.subscriptionTable) { + throw new ONSClientException("subscriptionTable not set"); + } + + this.batchConsumer = ONSFactory.createBatchConsumer(this.properties); + + for (final Map.Entry<Subscription, BatchMessageListener> next : this.subscriptionTable.entrySet()) { + this.subscribe(next.getKey().getTopic(), next.getKey().getExpression(), next.getValue()); + } + + this.batchConsumer.start(); + } + + @Override + public void updateCredential(Properties credentialProperties) { + if (this.batchConsumer != null) { + this.batchConsumer.updateCredential(credentialProperties); + } + } + + /** + * Close the {@code BatchConsumer} instance, it is recommended to configure the destroy-method of the bean. + */ + @Override + public void shutdown() { + if (this.batchConsumer != null) { + this.batchConsumer.shutdown(); + } + } + + @Override + public void subscribe(final String topic, final String subExpression, final BatchMessageListener listener) { + if (null == this.batchConsumer) { + throw new ONSClientException("subscribe must be called after BatchConsumerBean started"); + } + this.batchConsumer.subscribe(topic, subExpression, listener); + } + + @Override + public void unsubscribe(final String topic) { + if (null == this.batchConsumer) { + throw new ONSClientException("unsubscribe must be called after BatchConsumerBean started"); + } + this.batchConsumer.unsubscribe(topic); + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(final Properties properties) { + this.properties = properties; + } + + public Map<Subscription, BatchMessageListener> getSubscriptionTable() { + return subscriptionTable; + } + + public void setSubscriptionTable( + final Map<Subscription, BatchMessageListener> subscriptionTable) { + this.subscriptionTable = subscriptionTable; + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java new file mode 100644 index 0000000..1555be9 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ConsumerBean.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api.bean; + +import io.openmessaging.api.Consumer; +import io.openmessaging.api.ExpressionType; +import io.openmessaging.api.MessageListener; +import io.openmessaging.api.MessageSelector; +import io.openmessaging.api.bean.Subscription; +import io.openmessaging.api.bean.SubscriptionExt; +import java.lang.reflect.Method; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.exception.ONSClientException; + +/** + * {@code ConsumerBean} is used to integrate {@link Consumer} into Spring Bean + */ +public class ConsumerBean implements Consumer { + /** + * You need to inject this field to specify the properties of the {@code Consumer} instance. For details, see {@link + * PropertyKeyConst}. + * + * @see ConsumerBean#setProperties(Properties) + */ + private Properties properties; + + /** + * By injecting this field, complete the Topic subscription when launching {@code Consumer} + * + * @see ConsumerBean#setSubscriptionTable(Map) + */ + private Map<Subscription, MessageListener> subscriptionTable; + + private Consumer consumer; + + /** + * Start the {@code Consumer} instance, it is recommended to configure the init-method of the bean. + */ + @Override + public void start() { + if (null == this.properties) { + throw new ONSClientException("properties not set"); + } + + if (null == this.subscriptionTable) { + throw new ONSClientException("subscriptionTable not set"); + } + + this.consumer = ONSFactory.createConsumer(this.properties); + + Iterator<Entry<Subscription, MessageListener>> it = this.subscriptionTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<Subscription, MessageListener> next = it.next(); + if ("com.aliyun.openservices.ons.api.impl.notify.ConsumerImpl".equals(this.consumer.getClass().getCanonicalName()) + && (next.getKey() instanceof SubscriptionExt)) { + SubscriptionExt subscription = (SubscriptionExt) next.getKey(); + for (Method method : this.consumer.getClass().getMethods()) { + if ("subscribeNotify".equals(method.getName())) { + try { + method.invoke(consumer, subscription.getTopic(), subscription.getExpression(), + subscription.isPersistence(), next.getValue()); + } catch (Exception e) { + throw new ONSClientException("subscribeNotify invoke exception", e); + } + break; + } + } + + } else { + Subscription subscription = next.getKey(); + if (subscription.getType() == null || ExpressionType.TAG.name().equals(subscription.getType())) { + + this.subscribe(subscription.getTopic(), subscription.getExpression(), next.getValue()); + + } else if (ExpressionType.SQL92.name().equals(subscription.getType())) { + + this.subscribe(subscription.getTopic(), MessageSelector.bySql(subscription.getExpression()), next.getValue()); + } else { + + throw new ONSClientException(String.format("Expression type %s is unknown!", subscription.getType())); + } + } + + } + + this.consumer.start(); + } + + @Override + public void updateCredential(Properties credentialProperties) { + if (this.consumer != null) { + this.consumer.updateCredential(credentialProperties); + } + } + + /** + * Close the {@code Consumer} instance, it is recommended to configure the destroy-method of the bean. + */ + @Override + public void shutdown() { + if (this.consumer != null) { + this.consumer.shutdown(); + } + } + + @Override + public void subscribe(String topic, String subExpression, MessageListener listener) { + if (null == this.consumer) { + throw new ONSClientException("subscribe must be called after consumerBean started"); + } + this.consumer.subscribe(topic, subExpression, listener); + } + + @Override + public void subscribe(final String topic, final MessageSelector selector, final MessageListener listener) { + if (null == this.consumer) { + throw new ONSClientException("subscribe must be called after consumerBean started"); + } + this.consumer.subscribe(topic, selector, listener); + } + + @Override + public void unsubscribe(String topic) { + if (null == this.consumer) { + throw new ONSClientException("unsubscribe must be called after consumerBean started"); + } + this.consumer.unsubscribe(topic); + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + public Map<Subscription, MessageListener> getSubscriptionTable() { + return subscriptionTable; + } + + public void setSubscriptionTable(Map<Subscription, MessageListener> subscriptionTable) { + this.subscriptionTable = subscriptionTable; + } + + @Override + public boolean isStarted() { + return this.consumer.isStarted(); + } + + @Override + public boolean isClosed() { + return this.consumer.isClosed(); + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/OrderConsumerBean.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/OrderConsumerBean.java new file mode 100644 index 0000000..dda41e6 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/OrderConsumerBean.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api.bean; + +import io.openmessaging.api.MessageSelector; +import io.openmessaging.api.bean.Subscription; +import io.openmessaging.api.order.MessageOrderListener; +import io.openmessaging.api.order.OrderConsumer; +import java.util.Map; +import java.util.Properties; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; + +/** + * {@code OrderConsumerBean} is used to integrate {@link OrderConsumer} into Spring Bean + */ +public class OrderConsumerBean implements OrderConsumer { + /** + * Need to inject this field, specify the properties of the construct {@code OrderConsumer} instance, the specific + * supported properties are detailed in {@link PropertyKeyConst} + * + * @see OrderConsumerBean#setProperties(Properties) + */ + private Properties properties; + + /** + * By injecting this field, complete the subscription for Topic when launching {@code OrderConsumer} + * + * @see OrderConsumerBean#setSubscriptionTable(Map) + */ + private Map<Subscription, MessageOrderListener> subscriptionTable; + + private OrderConsumer orderConsumer; + + @Override + public boolean isStarted() { + return this.orderConsumer.isStarted(); + } + + @Override + public boolean isClosed() { + return this.orderConsumer.isClosed(); + } + + /** + * Start the {@code OrderConsumer} instance, it is recommended to configure the init-method of the bean. + */ + @Override + public void start() { + if (null == this.properties) { + throw new ONSClientException("properties not set"); + } + + if (null == this.subscriptionTable) { + throw new ONSClientException("subscriptionTable not set"); + } + + this.orderConsumer = ONSFactory.createOrderedConsumer(this.properties); + + for (final Map.Entry<Subscription, MessageOrderListener> next : this.subscriptionTable.entrySet()) { + this.subscribe(next.getKey().getTopic(), next.getKey().getExpression(), next.getValue()); + } + + this.orderConsumer.start(); + } + + @Override + public void updateCredential(Properties credentialProperties) { + if (this.orderConsumer != null) { + this.orderConsumer.updateCredential(credentialProperties); + } + } + + /** + * Close the {@code OrderConsumer} instance, it is recommended to configure the destroy-method of the bean. + */ + @Override + public void shutdown() { + if (this.orderConsumer != null) { + this.orderConsumer.shutdown(); + } + } + + @Override + public void subscribe(final String topic, final String subExpression, final MessageOrderListener listener) { + if (null == this.orderConsumer) { + throw new ONSClientException("subscribe must be called after OrderConsumerBean started"); + } + this.orderConsumer.subscribe(topic, subExpression, listener); + } + + @Override + public void subscribe(String topic, MessageSelector selector, MessageOrderListener listener) { + if (null == this.orderConsumer) { + throw new ONSClientException("subscribe must be called after OrderConsumerBean started"); + } + this.orderConsumer.subscribe(topic, selector, listener); + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(final Properties properties) { + this.properties = properties; + } + + public Map<Subscription, MessageOrderListener> getSubscriptionTable() { + return subscriptionTable; + } + + public void setSubscriptionTable( + final Map<Subscription, MessageOrderListener> subscriptionTable) { + this.subscriptionTable = subscriptionTable; + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/OrderProducerBean.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/OrderProducerBean.java new file mode 100644 index 0000000..8f57487 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/OrderProducerBean.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api.bean; + +import io.openmessaging.api.Message; +import io.openmessaging.api.SendResult; +import io.openmessaging.api.order.OrderProducer; +import java.util.Properties; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; + +/** + * {@code OrderProducerBean} is used to integrate {@link OrderProducer} into Spring Bean. + */ +public class OrderProducerBean implements OrderProducer { + /** + * Need to inject this field, specify the properties of the construct {@code OrderProducer} instance, the specific + * supported properties are detailed in {@link PropertyKeyConst} + * + * @see OrderProducerBean#setProperties(Properties) + */ + private Properties properties; + + private OrderProducer orderProducer; + + /** + * Start the {@code OrderProducer} instance, it is recommended to configure the init-method of the bean. + */ + @Override + public void start() { + if (null == this.properties) { + throw new ONSClientException("properties not set"); + } + + this.orderProducer = ONSFactory.createOrderProducer(this.properties); + this.orderProducer.start(); + } + + @Override + public void updateCredential(Properties credentialProperties) { + if (this.orderProducer != null) { + this.orderProducer.updateCredential(credentialProperties); + } + } + + /** + * Close the {@code OrderProducer} instance, it is recommended to configure the destroy-method of the bean. + */ + @Override + public void shutdown() { + if (this.orderProducer != null) { + this.orderProducer.shutdown(); + } + } + + @Override + public boolean isStarted() { + return this.orderProducer.isStarted(); + } + + @Override + public boolean isClosed() { + return this.orderProducer.isClosed(); + } + + @Override + public SendResult send(final Message message, final String shardingKey) { + return this.orderProducer.send(message, shardingKey); + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(final Properties properties) { + this.properties = properties; + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java new file mode 100644 index 0000000..26d9922 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api.bean; + +import io.openmessaging.api.Message; +import io.openmessaging.api.Producer; +import io.openmessaging.api.SendCallback; +import io.openmessaging.api.SendResult; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.exception.ONSClientException; + +/** + * {@code ProducerBean} for integrating {@link Producer} into Spring Bean + */ +public class ProducerBean implements Producer { + /** + * You need to inject this field to specify the properties of the {@code Producer} instance. For details, see {@link + * PropertyKeyConst} + * + * @see ProducerBean#setProperties(Properties) + */ + private Properties properties; + private Producer producer; + + /** + * Start the {@code Producer} instance, it is recommended to configure the init-method of the bean. + */ + @Override + public void start() { + if (null == this.properties) { + throw new ONSClientException("properties not set"); + } + + this.producer = ONSFactory.createProducer(this.properties); + this.producer.start(); + } + + @Override + public void updateCredential(Properties credentialProperties) { + if (this.producer != null) { + this.producer.updateCredential(credentialProperties); + } + } + + /** + * Close the {@code Producer} instance, it is recommended to configure the destroy-method of the bean + */ + @Override + public void shutdown() { + if (this.producer != null) { + this.producer.shutdown(); + } + } + + @Override + public SendResult send(Message message) { + return this.producer.send(message); + } + + @Override + public void sendOneway(Message message) { + this.producer.sendOneway(message); + } + + @Override + public void sendAsync(Message message, SendCallback sendCallback) { + this.producer.sendAsync(message, sendCallback); + } + + @Override + public void setCallbackExecutor(final ExecutorService callbackExecutor) { + this.producer.setCallbackExecutor(callbackExecutor); + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + @Override + public boolean isStarted() { + return this.producer.isStarted(); + } + + @Override + public boolean isClosed() { + return this.producer.isClosed(); + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java new file mode 100644 index 0000000..c39e732 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api.bean; + +import io.openmessaging.api.Message; +import io.openmessaging.api.SendResult; +import io.openmessaging.api.transaction.LocalTransactionChecker; +import io.openmessaging.api.transaction.LocalTransactionExecuter; +import io.openmessaging.api.transaction.TransactionProducer; +import java.util.Properties; +import org.apache.rocketmq.ons.api.ONSFactory; +import org.apache.rocketmq.ons.api.PropertyKeyConst; +import org.apache.rocketmq.ons.api.exception.ONSClientException; + +/** + * {@code TransactionProducerBean} is used to integrate {@link TransactionProducer} into Spring Bean + */ +public class TransactionProducerBean implements TransactionProducer { + /** + * Need to inject this field, specify the properties of the construct {@code TransactionProducer} instance, the + * specific supported properties are detailed in {@link PropertyKeyConst} + * + * @see TransactionProducerBean#setProperties(Properties) + */ + private Properties properties; + + /** + * Need to inject this field, {@code TransactionProducer} will send the transaction message will rely on the object + * for transaction status checkback + * + * @see TransactionProducerBean#setLocalTransactionChecker(LocalTransactionChecker) + */ + private LocalTransactionChecker localTransactionChecker; + + private TransactionProducer transactionProducer; + + /** + * + */ + @Override + public void start() { + if (null == this.properties) { + throw new ONSClientException("properties not set"); + } + + this.transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker); + this.transactionProducer.start(); + } + + @Override + public void updateCredential(Properties credentialProperties) { + if (this.transactionProducer != null) { + this.transactionProducer.updateCredential(credentialProperties); + } + } + + /** + * Close the {@code TransactionProducer} instance, it is recommended to configure the destroy-method of the bean. + */ + @Override + public void shutdown() { + if (this.transactionProducer != null) { + this.transactionProducer.shutdown(); + } + } + + @Override + public SendResult send(Message message, LocalTransactionExecuter executer, Object arg) { + return this.transactionProducer.send(message, executer, arg); + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + public LocalTransactionChecker getLocalTransactionChecker() { + return localTransactionChecker; + } + + public void setLocalTransactionChecker(LocalTransactionChecker localTransactionChecker) { + this.localTransactionChecker = localTransactionChecker; + } + + @Override + public boolean isStarted() { + return this.transactionProducer.isStarted(); + } + + @Override + public boolean isClosed() { + return this.transactionProducer.isClosed(); + } +} diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java index 8a59dc3..c8d300e 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/MQConfig.java @@ -31,6 +31,6 @@ public class MQConfig { /** * NAMESRV_ADDR */ - public static final String NAMESRV_ADDR = "47.107.167.190:9876"; + public static final String NAMESRV_ADDR = "127.0.0.1:9876"; } diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java index 34fe915..51f558b 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleMQConsumer.java @@ -27,14 +27,23 @@ import org.apache.rocketmq.ons.sample.MQConfig; public class SimpleMQConsumer { public static void main(String[] args) { - MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://[email protected]/us-east"); + MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876"); Properties consumerProperties = new Properties(); consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID); consumerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); consumerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); - consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); + Consumer consumer = messagingAccessPoint.createConsumer(consumerProperties); + /* + * Alternatively, you can use the ONSFactory to create instance directly. + * <pre> + * {@code + * consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); + * OrderConsumer consumer = ONSFactory.createOrderedConsumer(consumerProperties); + * } + * </pre> + */ consumer.subscribe(MQConfig.TOPIC, MQConfig.TAG, new MessageListenerImpl()); consumer.start(); System.out.printf("Consumer start success. %n"); @@ -44,5 +53,6 @@ public class SimpleMQConsumer { } catch (InterruptedException e) { e.printStackTrace(); } + consumer.shutdown(); } } diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java index d3aaa07..d991474 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/consumer/SimpleOrderConsumer.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.ons.sample.consumer; - import io.openmessaging.api.Message; import io.openmessaging.api.MessagingAccessPoint; import io.openmessaging.api.OMS; @@ -31,13 +30,23 @@ import org.apache.rocketmq.ons.sample.MQConfig; public class SimpleOrderConsumer { public static void main(String[] args) { - MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://[email protected]/us-east"); + MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876"); Properties consumerProperties = new Properties(); consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.ORDER_GROUP_ID); consumerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); consumerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); - consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); OrderConsumer consumer = messagingAccessPoint.createOrderedConsumer(consumerProperties); + + /* + * Alternatively, you can use the ONSFactory to create instance directly. + * <pre> + * {@code + * consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); + * OrderConsumer consumer = ONSFactory.createOrderedConsumer(consumerProperties); + * } + * </pre> + */ + consumer.subscribe(MQConfig.ORDER_TOPIC, MQConfig.TAG, new MessageOrderListener() { @Override @@ -54,5 +63,6 @@ public class SimpleOrderConsumer { } catch (InterruptedException e) { e.printStackTrace(); } + consumer.shutdown(); } } diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java index 7cc5682..fcf7554 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/MQTimerProducer.java @@ -29,13 +29,24 @@ import org.apache.rocketmq.ons.sample.MQConfig; public class MQTimerProducer { public static void main(String[] args) { - MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://47.107.167.190:9876"); + MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876"); Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID); producerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); producerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); Producer producer = messagingAccessPoint.createProducer(producerProperties); + + /* + * Alternatively, you can use the ONSFactory to create instance directly. + * <pre> + * {@code + * producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); + * OrderProducer producer = ONSFactory.createOrderProducer(producerProperties); + * } + * </pre> + */ + producer.start(); System.out.printf("Producer Started. %n"); diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java index 2b53968..5bb7f9e 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleMQProducer.java @@ -17,30 +17,37 @@ package org.apache.rocketmq.ons.sample.producer; import io.openmessaging.api.Message; +import io.openmessaging.api.MessagingAccessPoint; +import io.openmessaging.api.OMS; import io.openmessaging.api.Producer; import io.openmessaging.api.SendResult; import io.openmessaging.api.exception.OMSRuntimeException; import java.util.Properties; -import org.apache.rocketmq.ons.api.ONSFactory; import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.sample.MQConfig; - // io.openmessaging.api.xxx => com.aliyun.openservices.ons.api.xxxx - - public class SimpleMQProducer { - public static void main(String[] args) { + MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876"); Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID); producerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); producerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); - producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); - Producer producer = ONSFactory.createProducer(producerProperties); + + Producer producer = messagingAccessPoint.createProducer(producerProperties); + /* + * Alternatively, you can use the ONSFactory to create instance directly. + * <pre> + * {@code + * producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); + * Producer producer = ONSFactory.createProducer(producerProperties); + * } + * </pre> + */ producer.start(); System.out.printf("Producer Started %n"); @@ -56,5 +63,7 @@ public class SimpleMQProducer { e.printStackTrace(); } } + + producer.shutdown(); } } diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java index 2d49a48..a454297 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleOrderProducer.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.ons.sample.producer; - import io.openmessaging.api.Message; import io.openmessaging.api.MessagingAccessPoint; import io.openmessaging.api.OMS; @@ -30,14 +29,25 @@ import org.apache.rocketmq.ons.sample.MQConfig; public class SimpleOrderProducer { public static void main(String[] args) { - MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://[email protected]/us-east"); + MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876"); Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.ORDER_GROUP_ID); producerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); producerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); - producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); OrderProducer producer = messagingAccessPoint.createOrderProducer(producerProperties); + + + /* + * Alternatively, you can use the ONSFactory to create instance directly. + * <pre> + * {@code + * producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); + * OrderProducer producer = ONSFactory.createOrderProducer(producerProperties); + * } + * </pre> + */ + producer.start(); System.out.printf("Producer Started. %n"); @@ -55,5 +65,6 @@ public class SimpleOrderProducer { e.printStackTrace(); } } + producer.shutdown(); } } diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java index e5834f7..44da502 100644 --- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java +++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java @@ -16,12 +16,10 @@ */ package org.apache.rocketmq.ons.sample.producer; - import io.openmessaging.api.Message; import io.openmessaging.api.MessagingAccessPoint; import io.openmessaging.api.OMS; import io.openmessaging.api.SendResult; -import io.openmessaging.api.exception.OMSRuntimeException; import io.openmessaging.api.transaction.LocalTransactionExecuter; import io.openmessaging.api.transaction.TransactionProducer; import io.openmessaging.api.transaction.TransactionStatus; @@ -34,15 +32,25 @@ import org.apache.rocketmq.ons.sample.MQConfig; public class SimpleTransactionProducer { public static void main(String[] args) { - MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://[email protected]/us-east"); + MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://127.0.0.1:9876"); Properties tranProducerProperties = new Properties(); tranProducerProperties.setProperty(PropertyKeyConst.GROUP_ID, MQConfig.GROUP_ID); tranProducerProperties.setProperty(PropertyKeyConst.AccessKey, MQConfig.ACCESS_KEY); tranProducerProperties.setProperty(PropertyKeyConst.SecretKey, MQConfig.SECRET_KEY); - tranProducerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl(); TransactionProducer transactionProducer = messagingAccessPoint.createTransactionProducer(tranProducerProperties, localTransactionChecker); + + /* + * Alternatively, you can use the ONSFactory to create instance directly. + * <pre> + * {@code + * producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MQConfig.NAMESRV_ADDR); + * TransactionProducer producer = ONSFactory.createTransactionProducer(tranProducerProperties, localTransactionChecker); + * } + * </pre> + */ + transactionProducer.start(); Message message = new Message(MQConfig.TOPIC, MQConfig.TAG, "MQ send transaction message test".getBytes());
