Polish the oms config load mechanism.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/2e3c1b00 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/2e3c1b00 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/2e3c1b00 Branch: refs/heads/openmessaging-impl Commit: 2e3c1b00b7641634bd4a8e22f78544abd3d2d3dd Parents: 625ba07 Author: yukon <[email protected]> Authored: Wed Apr 19 15:31:17 2017 +0800 Committer: yukon <[email protected]> Committed: Wed Apr 19 15:31:17 2017 +0800 ---------------------------------------------------------------------- .../openmessaging/SimplePullConsumer.java | 8 +- .../io/openmessaging/rocketmq/ClientConfig.java | 194 +++++++++++++++++++ .../rocketmq/MessagingAccessPointImpl.java | 1 + .../java/io/openmessaging/rocketmq/OMSUtil.java | 182 ----------------- .../rocketmq/consumer/LocalMessageCache.java | 53 ++--- .../rocketmq/consumer/PullConsumerImpl.java | 26 +-- .../rocketmq/consumer/PushConsumerImpl.java | 39 ++-- .../rocketmq/producer/AbstractOMSProducer.java | 19 +- .../rocketmq/producer/ProducerImpl.java | 4 +- .../rocketmq/producer/SequenceProducerImpl.java | 2 +- .../openmessaging/rocketmq/utils/BeanUtils.java | 185 ++++++++++++++++++ .../openmessaging/rocketmq/utils/OMSUtil.java | 182 +++++++++++++++++ 12 files changed, 623 insertions(+), 272 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java index 8dd7b23..86cb696 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java @@ -48,9 +48,11 @@ public class SimplePullConsumer { while (true) { Message message = consumer.poll(); - String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); - System.out.println("Received one message: " + msgId); - consumer.ack(msgId); + if (message != null) { + String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); + System.out.println("Received one message: " + msgId); + consumer.ack(msgId); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/ClientConfig.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/ClientConfig.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/ClientConfig.java new file mode 100644 index 0000000..fbca21a --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/ClientConfig.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq; + +import io.openmessaging.PropertyKeys; +import io.openmessaging.rocketmq.domain.NonStandardKeys; + +public class ClientConfig implements PropertyKeys, NonStandardKeys { + private String omsDriverImpl; + private String omsAccessPoints; + private String omsNamespace; + private String omsProducerId; + private String omsConsumerId; + private int omsOperationTimeout = 5000; + private String omsRoutingName; + private String omsOperatorName; + private String omsDstQueue; + private String omsSrcTopic; + private String rmqConsumerGroup; + private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP"; + private int rmqMaxRedeliveryTimes = 16; + private int rmqMessageConsumeTimeout = 15; //In minutes + private int rmqMaxConsumeThreadNums = 64; + private int rmqMinConsumeThreadNums = 20; + private String rmqMessageDestination; + private int rmqPullMessageBatchNums = 32; + private int rmqPullMessageCacheCapacity = 1000; + + public String getOmsDriverImpl() { + return omsDriverImpl; + } + + public void setOmsDriverImpl(final String omsDriverImpl) { + this.omsDriverImpl = omsDriverImpl; + } + + public String getOmsAccessPoints() { + return omsAccessPoints; + } + + public void setOmsAccessPoints(final String omsAccessPoints) { + this.omsAccessPoints = omsAccessPoints; + } + + public String getOmsNamespace() { + return omsNamespace; + } + + public void setOmsNamespace(final String omsNamespace) { + this.omsNamespace = omsNamespace; + } + + public String getOmsProducerId() { + return omsProducerId; + } + + public void setOmsProducerId(final String omsProducerId) { + this.omsProducerId = omsProducerId; + } + + public String getOmsConsumerId() { + return omsConsumerId; + } + + public void setOmsConsumerId(final String omsConsumerId) { + this.omsConsumerId = omsConsumerId; + } + + public int getOmsOperationTimeout() { + return omsOperationTimeout; + } + + public void setOmsOperationTimeout(final int omsOperationTimeout) { + this.omsOperationTimeout = omsOperationTimeout; + } + + public String getOmsRoutingName() { + return omsRoutingName; + } + + public void setOmsRoutingName(final String omsRoutingName) { + this.omsRoutingName = omsRoutingName; + } + + public String getOmsOperatorName() { + return omsOperatorName; + } + + public void setOmsOperatorName(final String omsOperatorName) { + this.omsOperatorName = omsOperatorName; + } + + public String getOmsDstQueue() { + return omsDstQueue; + } + + public void setOmsDstQueue(final String omsDstQueue) { + this.omsDstQueue = omsDstQueue; + } + + public String getOmsSrcTopic() { + return omsSrcTopic; + } + + public void setOmsSrcTopic(final String omsSrcTopic) { + this.omsSrcTopic = omsSrcTopic; + } + + public String getRmqConsumerGroup() { + return rmqConsumerGroup; + } + + public void setRmqConsumerGroup(final String rmqConsumerGroup) { + this.rmqConsumerGroup = rmqConsumerGroup; + } + + public String getRmqProducerGroup() { + return rmqProducerGroup; + } + + public void setRmqProducerGroup(final String rmqProducerGroup) { + this.rmqProducerGroup = rmqProducerGroup; + } + + public int getRmqMaxRedeliveryTimes() { + return rmqMaxRedeliveryTimes; + } + + public void setRmqMaxRedeliveryTimes(final int rmqMaxRedeliveryTimes) { + this.rmqMaxRedeliveryTimes = rmqMaxRedeliveryTimes; + } + + public int getRmqMessageConsumeTimeout() { + return rmqMessageConsumeTimeout; + } + + public void setRmqMessageConsumeTimeout(final int rmqMessageConsumeTimeout) { + this.rmqMessageConsumeTimeout = rmqMessageConsumeTimeout; + } + + public int getRmqMaxConsumeThreadNums() { + return rmqMaxConsumeThreadNums; + } + + public void setRmqMaxConsumeThreadNums(final int rmqMaxConsumeThreadNums) { + this.rmqMaxConsumeThreadNums = rmqMaxConsumeThreadNums; + } + + public int getRmqMinConsumeThreadNums() { + return rmqMinConsumeThreadNums; + } + + public void setRmqMinConsumeThreadNums(final int rmqMinConsumeThreadNums) { + this.rmqMinConsumeThreadNums = rmqMinConsumeThreadNums; + } + + public String getRmqMessageDestination() { + return rmqMessageDestination; + } + + public void setRmqMessageDestination(final String rmqMessageDestination) { + this.rmqMessageDestination = rmqMessageDestination; + } + + public int getRmqPullMessageBatchNums() { + return rmqPullMessageBatchNums; + } + + public void setRmqPullMessageBatchNums(final int rmqPullMessageBatchNums) { + this.rmqPullMessageBatchNums = rmqPullMessageBatchNums; + } + + public int getRmqPullMessageCacheCapacity() { + return rmqPullMessageCacheCapacity; + } + + public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) { + this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java index af1695b..a897da5 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -31,6 +31,7 @@ import io.openmessaging.rocketmq.consumer.PullConsumerImpl; import io.openmessaging.rocketmq.consumer.PushConsumerImpl; import io.openmessaging.rocketmq.producer.ProducerImpl; import io.openmessaging.rocketmq.producer.SequenceProducerImpl; +import io.openmessaging.rocketmq.utils.OMSUtil; public class MessagingAccessPointImpl implements MessagingAccessPoint { private final KeyValue accessPointProperties; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java deleted file mode 100644 index 87037ee..0000000 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.openmessaging.rocketmq; - -import io.openmessaging.BytesMessage; -import io.openmessaging.KeyValue; -import io.openmessaging.MessageHeader; -import io.openmessaging.OMS; -import io.openmessaging.SendResult; -import io.openmessaging.rocketmq.domain.BytesMessageImpl; -import io.openmessaging.rocketmq.domain.NonStandardKeys; -import io.openmessaging.rocketmq.domain.SendResultImpl; -import java.lang.reflect.Field; -import java.util.Iterator; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; -import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.message.MessageAccessor; - -public class OMSUtil { - - /** - * Builds a OMS client instance name. - * - * @return a unique instance name - */ - public static String buildInstanceName() { - return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime(); - } - - public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) { - org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); - rmqMessage.setBody(omsMessage.getBody()); - - KeyValue headers = omsMessage.headers(); - KeyValue properties = omsMessage.properties(); - - //All destinations in RocketMQ use Topic - if (headers.containsKey(MessageHeader.TOPIC)) { - rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC)); - rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); - } else { - rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE)); - rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE"); - } - - for (String key : properties.keySet()) { - MessageAccessor.putProperty(rmqMessage, key, properties.getString(key)); - } - - //Headers has a high priority - for (String key : headers.keySet()) { - MessageAccessor.putProperty(rmqMessage, key, headers.getString(key)); - } - - return rmqMessage; - } - - public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) { - BytesMessage omsMsg = new BytesMessageImpl(); - omsMsg.setBody(rmqMsg.getBody()); - - KeyValue headers = omsMsg.headers(); - KeyValue properties = omsMsg.properties(); - - final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet(); - - for (final Map.Entry<String, String> entry : entries) { - if (isOMSHeader(entry.getKey())) { - headers.put(entry.getKey(), entry.getValue()); - } else { - properties.put(entry.getKey(), entry.getValue()); - } - } - - omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId()); - if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) || - rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) { - omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic()); - } else { - omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic()); - } - omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys()); - omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost())); - omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp()); - omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost())); - omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp()); - return omsMsg; - } - - public static boolean isOMSHeader(String value) { - for (Field field : MessageHeader.class.getDeclaredFields()) { - try { - if (field.get(MessageHeader.class).equals(value)) { - return true; - } - } catch (IllegalAccessException e) { - return false; - } - } - return false; - } - - /** - * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult. - */ - public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) { - assert rmqResult.getSendStatus().equals(SendStatus.SEND_OK); - return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue()); - } - - public static KeyValue buildKeyValue(KeyValue... keyValues) { - KeyValue keyValue = OMS.newKeyValue(); - for (KeyValue properties : keyValues) { - for (String key : properties.keySet()) { - keyValue.put(key, properties.getString(key)); - } - } - return keyValue; - } - - /** - * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}. - */ - public static <T> Iterator<T> cycle(final Iterable<T> iterable) { - return new Iterator<T>() { - Iterator<T> iterator = new Iterator<T>() { - @Override - public synchronized boolean hasNext() { - return false; - } - - @Override - public synchronized T next() { - throw new NoSuchElementException(); - } - - @Override - public synchronized void remove() { - //Ignore - } - }; - - @Override - public synchronized boolean hasNext() { - return iterator.hasNext() || iterable.iterator().hasNext(); - } - - @Override - public synchronized T next() { - if (!iterator.hasNext()) { - iterator = iterable.iterator(); - if (!iterator.hasNext()) { - throw new NoSuchElementException(); - } - } - return iterator.next(); - } - - @Override - public synchronized void remove() { - iterator.remove(); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java index 968229a..0ffd36c 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -18,8 +18,8 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.KeyValue; import io.openmessaging.PropertyKeys; +import io.openmessaging.rocketmq.ClientConfig; import io.openmessaging.rocketmq.domain.ConsumeRequest; -import io.openmessaging.rocketmq.domain.NonStandardKeys; import java.util.Collections; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -38,32 +38,19 @@ class LocalMessageCache { private final Map<String, ConsumeRequest> consumedRequest; private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable; private final DefaultMQPullConsumer rocketmqPullConsumer; - private int pullBatchNums = 32; - private int pollTimeout = -1; + private final ClientConfig clientConfig; private final static Logger log = ClientLogger.getLog(); - LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final KeyValue properties) { - int cacheCapacity = 1000; - if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY)) { - cacheCapacity = properties.getInt(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY); - } - consumeRequestCache = new LinkedBlockingQueue<>(cacheCapacity); - - if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS)) { - pullBatchNums = properties.getInt(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS); - } - - if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) { - pollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); - } - + LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) { + consumeRequestCache = new LinkedBlockingQueue<>(clientConfig.getRmqPullMessageCacheCapacity()); this.consumedRequest = new ConcurrentHashMap<>(); this.pullOffsetTable = new ConcurrentHashMap<>(); this.rocketmqPullConsumer = rocketmqPullConsumer; + this.clientConfig = clientConfig; } int nextPullBatchNums() { - return Math.min(pullBatchNums, consumeRequestCache.remainingCapacity()); + return Math.min(clientConfig.getRmqPullMessageBatchNums(), consumeRequestCache.remainingCapacity()); } long nextPullOffset(MessageQueue remoteQueue) { @@ -90,31 +77,25 @@ class LocalMessageCache { } MessageExt poll() { - try { - ConsumeRequest consumeRequest = consumeRequestCache.take(); - consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); - consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest); - return consumeRequest.getMessageExt(); - } catch (InterruptedException ignore) { - } - return null; + return poll(clientConfig.getOmsOperationTimeout()); } MessageExt poll(final KeyValue properties) { - int currentPollTimeout = pollTimeout; + int currentPollTimeout = clientConfig.getOmsOperationTimeout(); if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) { currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); } + return poll(currentPollTimeout); + } - if (currentPollTimeout == -1) { - return poll(); - } - + private MessageExt poll(long timeout) { try { - ConsumeRequest consumeRequest = consumeRequestCache.poll(currentPollTimeout, TimeUnit.MILLISECONDS); - consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); - consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest); - return consumeRequest.getMessageExt(); + ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS); + if (consumeRequest != null) { + consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); + consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest); + return consumeRequest.getMessageExt(); + } } catch (InterruptedException ignore) { } return null; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index bd33d78..56a49a4 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -21,9 +21,10 @@ import io.openmessaging.Message; import io.openmessaging.PropertyKeys; import io.openmessaging.PullConsumer; import io.openmessaging.exception.OMSRuntimeException; -import io.openmessaging.rocketmq.OMSUtil; +import io.openmessaging.rocketmq.ClientConfig; import io.openmessaging.rocketmq.domain.ConsumeRequest; -import io.openmessaging.rocketmq.domain.NonStandardKeys; +import io.openmessaging.rocketmq.utils.BeanUtils; +import io.openmessaging.rocketmq.utils.OMSUtil; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.MQPullConsumer; import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; @@ -44,6 +45,7 @@ public class PullConsumerImpl implements PullConsumer { private String targetQueueName; private final MQPullConsumerScheduleService pullConsumerScheduleService; private final LocalMessageCache localMessageCache; + private final ClientConfig clientConfig; final static Logger log = ClientLogger.getLog(); @@ -51,7 +53,9 @@ public class PullConsumerImpl implements PullConsumer { this.properties = properties; this.targetQueueName = queueName; - String consumerGroup = properties.getString(NonStandardKeys.CONSUMER_GROUP); + this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); + + String consumerGroup = clientConfig.getRmqConsumerGroup(); if (null == consumerGroup || consumerGroup.isEmpty()) { throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); } @@ -59,7 +63,7 @@ public class PullConsumerImpl implements PullConsumer { this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); - String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS); + String accessPoints = clientConfig.getOmsAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } @@ -67,16 +71,14 @@ public class PullConsumerImpl implements PullConsumer { this.rocketmqPullConsumer.setConsumerGroup(consumerGroup); - int maxReDeliveryTimes = properties.getInt(NonStandardKeys.MAX_REDELIVERY_TIMES); - if (maxReDeliveryTimes != 0) { - this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes); - } + int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes(); + this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes); String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPullConsumer.setInstanceName(consumerId); properties.put(PropertyKeys.CONSUMER_ID, consumerId); - this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, properties); + this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig); } @Override @@ -86,12 +88,14 @@ public class PullConsumerImpl implements PullConsumer { @Override public Message poll() { - return OMSUtil.msgConvert(localMessageCache.poll()); + MessageExt rmqMsg = localMessageCache.poll(); + return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); } @Override public Message poll(final KeyValue properties) { - return OMSUtil.msgConvert(localMessageCache.poll(properties)); + MessageExt rmqMsg = localMessageCache.poll(properties); + return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); } @Override http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index 9c3b6a9..65c8ee0 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -24,7 +24,9 @@ import io.openmessaging.PropertyKeys; import io.openmessaging.PushConsumer; import io.openmessaging.ReceivedMessageContext; import io.openmessaging.exception.OMSRuntimeException; -import io.openmessaging.rocketmq.OMSUtil; +import io.openmessaging.rocketmq.ClientConfig; +import io.openmessaging.rocketmq.utils.BeanUtils; +import io.openmessaging.rocketmq.utils.OMSUtil; import io.openmessaging.rocketmq.domain.NonStandardKeys; import java.util.List; import java.util.Map; @@ -43,43 +45,29 @@ public class PushConsumerImpl implements PushConsumer { private final KeyValue properties; private boolean started = false; private final Map<String, MessageListener> subscribeTable = new ConcurrentHashMap<>(); + private final ClientConfig clientConfig; public PushConsumerImpl(final KeyValue properties) { this.rocketmqPushConsumer = new DefaultMQPushConsumer(); this.properties = properties; + this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); - String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS); + String accessPoints = clientConfig.getOmsAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); - String consumerGroup = properties.getString(NonStandardKeys.CONSUMER_GROUP); + String consumerGroup = clientConfig.getRmqConsumerGroup(); if (null == consumerGroup || consumerGroup.isEmpty()) { throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); } this.rocketmqPushConsumer.setConsumerGroup(consumerGroup); - - int maxReDeliveryTimes = properties.getInt(NonStandardKeys.MAX_REDELIVERY_TIMES); - if (maxReDeliveryTimes != 0) { - this.rocketmqPushConsumer.setMaxReconsumeTimes(maxReDeliveryTimes); - } - - int messageConsumeTimeout = properties.getInt(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT); - if (messageConsumeTimeout != 0) { - this.rocketmqPushConsumer.setConsumeTimeout(messageConsumeTimeout); - } - - int maxConsumeThreadNums = properties.getInt(NonStandardKeys.MAX_CONSUME_THREAD_NUMS); - if (maxConsumeThreadNums != 0) { - this.rocketmqPushConsumer.setConsumeThreadMax(maxConsumeThreadNums); - } - - int minConsumeThreadNums = properties.getInt(NonStandardKeys.MIN_CONSUME_THREAD_NUMS); - if (minConsumeThreadNums != 0) { - this.rocketmqPushConsumer.setConsumeThreadMin(minConsumeThreadNums); - } + this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes()); + this.rocketmqPushConsumer.setConsumeTimeout(clientConfig.getRmqMessageConsumeTimeout()); + this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums()); + this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums()); String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPushConsumer.setInstanceName(consumerId); @@ -181,10 +169,9 @@ public class PushConsumerImpl implements PushConsumer { long begin = System.currentTimeMillis(); listener.onMessage(omsMsg, context); long costs = System.currentTimeMillis() - begin; - + long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000; try { - sync.await(Math.max(0, PushConsumerImpl.this.rocketmqPushConsumer.getConsumeTimeout() - costs) - , TimeUnit.MILLISECONDS); + sync.await(Math.max(0, timeoutMills - costs), TimeUnit.MILLISECONDS); } catch (InterruptedException ignore) { } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java index 32d65cd..7de7888 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -27,8 +27,9 @@ import io.openmessaging.exception.OMSMessageFormatException; import io.openmessaging.exception.OMSNotSupportedException; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSTimeOutException; +import io.openmessaging.rocketmq.ClientConfig; import io.openmessaging.rocketmq.domain.BytesMessageImpl; -import io.openmessaging.rocketmq.domain.NonStandardKeys; +import io.openmessaging.rocketmq.utils.BeanUtils; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; @@ -38,33 +39,29 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.slf4j.Logger; -import static io.openmessaging.rocketmq.OMSUtil.buildInstanceName; +import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName; abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{ final static Logger log = ClientLogger.getLog(); final KeyValue properties; final DefaultMQProducer rocketmqProducer; private boolean started = false; + final ClientConfig clientConfig; AbstractOMSProducer(final KeyValue properties) { this.properties = properties; this.rocketmqProducer = new DefaultMQProducer(); + this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); - String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS); + String accessPoints = clientConfig.getOmsAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';')); - - String producerGroup = properties.getString(NonStandardKeys.PRODUCER_GROUP); - if (producerGroup == null || producerGroup.isEmpty()) { - producerGroup = "__OMS_PRODUCER_DEFAULT_GROUP"; - } - this.rocketmqProducer.setProducerGroup(producerGroup); + this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup()); String producerId = buildInstanceName(); - int operationTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); - this.rocketmqProducer.setSendMsgTimeout(operationTimeout == 0 ? 5000 : operationTimeout); + this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout()); this.rocketmqProducer.setInstanceName(producerId); this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); properties.put(PropertyKeys.PRODUCER_ID, producerId); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java index f5d2f25..8b2ddd2 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java @@ -25,12 +25,12 @@ import io.openmessaging.Promise; import io.openmessaging.PropertyKeys; import io.openmessaging.SendResult; import io.openmessaging.exception.OMSRuntimeException; -import io.openmessaging.rocketmq.OMSUtil; +import io.openmessaging.rocketmq.utils.OMSUtil; import io.openmessaging.rocketmq.promise.DefaultPromise; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendStatus; -import static io.openmessaging.rocketmq.OMSUtil.msgConvert; +import static io.openmessaging.rocketmq.utils.OMSUtil.msgConvert; public class ProducerImpl extends AbstractOMSProducer implements Producer { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java index 89ece2b..58b1a12 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java @@ -21,7 +21,7 @@ import io.openmessaging.KeyValue; import io.openmessaging.Message; import io.openmessaging.MessageHeader; import io.openmessaging.SequenceProducer; -import io.openmessaging.rocketmq.OMSUtil; +import io.openmessaging.rocketmq.utils.OMSUtil; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java new file mode 100644 index 0000000..d8eed84 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.utils; + +import io.openmessaging.KeyValue; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.log.ClientLogger; +import org.slf4j.Logger; + +public final class BeanUtils { + final static Logger log = ClientLogger.getLog(); + + /** + * Maps primitive {@code Class}es to their corresponding wrapper {@code Class}. + */ + private static Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<Class<?>, Class<?>>(); + + static { + primitiveWrapperMap.put(Boolean.TYPE, Boolean.class); + primitiveWrapperMap.put(Byte.TYPE, Byte.class); + primitiveWrapperMap.put(Character.TYPE, Character.class); + primitiveWrapperMap.put(Short.TYPE, Short.class); + primitiveWrapperMap.put(Integer.TYPE, Integer.class); + primitiveWrapperMap.put(Long.TYPE, Long.class); + primitiveWrapperMap.put(Double.TYPE, Double.class); + primitiveWrapperMap.put(Float.TYPE, Float.class); + primitiveWrapperMap.put(Void.TYPE, Void.TYPE); + } + + private static Map<Class<?>, Class<?>> wrapperMap = new HashMap<Class<?>, Class<?>>(); + + static { + for (final Class<?> primitiveClass : primitiveWrapperMap.keySet()) { + final Class<?> wrapperClass = primitiveWrapperMap.get(primitiveClass); + if (!primitiveClass.equals(wrapperClass)) { + wrapperMap.put(wrapperClass, primitiveClass); + } + } + wrapperMap.put(String.class, String.class); + } + + /** + * <p>Populate the JavaBeans properties of the specified bean, based on + * the specified name/value pairs. This method uses Java reflection APIs + * to identify corresponding "property setter" method names, and deals + * with setter arguments of type <Code>String</Code>, <Code>boolean</Code>, + * <Code>int</Code>, <Code>long</Code>, <Code>float</Code>, and + * <Code>double</Code>.</p> + * + * <p>The particular setter method to be called for each property is + * determined using the usual JavaBeans introspection mechanisms. Thus, + * you may identify custom setter methods using a BeanInfo class that is + * associated with the class of the bean itself. If no such BeanInfo + * class is available, the standard method name conversion ("set" plus + * the capitalized name of the property in question) is used.</p> + * + * <p><strong>NOTE</strong>: It is contrary to the JavaBeans Specification + * to have more than one setter method (with different argument + * signatures) for the same property.</p> + * + * @param clazz JavaBean class whose properties are being populated + * @param properties Map keyed by property name, with the corresponding (String or String[]) value(s) to be set + * @param <T> Class type + * @return Class instance + */ + public static <T> T populate(final Properties properties, final Class<T> clazz) { + T obj = null; + try { + obj = clazz.newInstance(); + return populate(properties, obj); + } catch (Throwable e) { + log.warn("Error occurs !", e); + } + return obj; + } + + public static <T> T populate(final KeyValue properties, final Class<T> clazz) { + T obj = null; + try { + obj = clazz.newInstance(); + return populate(properties, obj); + } catch (Throwable e) { + log.warn("Error occurs !", e); + } + return obj; + } + + public static Class<?> getMethodClass(Class<?> clazz, String methodName) { + Method[] methods = clazz.getMethods(); + for (Method method : methods) { + if (method.getName().equalsIgnoreCase(methodName)) { + return method.getParameterTypes()[0]; + } + } + return null; + } + + public static void setProperties(Class<?> clazz, Object obj, String methodName, + Object value) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Class<?> parameterClass = getMethodClass(clazz, methodName); + Method setterMethod = clazz.getMethod(methodName, parameterClass); + if (parameterClass == Boolean.TYPE) { + setterMethod.invoke(obj, Boolean.valueOf(value.toString())); + } else if (parameterClass == Integer.TYPE) { + setterMethod.invoke(obj, Integer.valueOf(value.toString())); + } else if (parameterClass == Double.TYPE) { + setterMethod.invoke(obj, Double.valueOf(value.toString())); + } else if (parameterClass == Float.TYPE) { + setterMethod.invoke(obj, Float.valueOf(value.toString())); + } else if (parameterClass == Long.TYPE) { + setterMethod.invoke(obj, Long.valueOf(value.toString())); + } else + setterMethod.invoke(obj, value); + } + + public static <T> T populate(final Properties properties, final T obj) { + Class<?> clazz = obj.getClass(); + try { + + Set<Map.Entry<Object, Object>> entries = properties.entrySet(); + for (Map.Entry<Object, Object> entry : entries) { + String entryKey = entry.getKey().toString(); + String[] keyGroup = entryKey.split("\\."); + for (int i = 0; i < keyGroup.length; i++) { + keyGroup[i] = keyGroup[i].toLowerCase(); + keyGroup[i] = StringUtils.capitalize(keyGroup[i]); + } + String beanFieldNameWithCapitalization = StringUtils.join(keyGroup); + try { + setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, entry.getValue()); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { + //ignored... + } + } + } catch (RuntimeException e) { + log.warn("Error occurs !", e); + } + return obj; + } + + public static <T> T populate(final KeyValue properties, final T obj) { + Class<?> clazz = obj.getClass(); + try { + + final Set<String> keySet = properties.keySet(); + for (String key : keySet) { + String[] keyGroup = key.split("\\."); + for (int i = 0; i < keyGroup.length; i++) { + keyGroup[i] = keyGroup[i].toLowerCase(); + keyGroup[i] = StringUtils.capitalize(keyGroup[i]); + } + String beanFieldNameWithCapitalization = StringUtils.join(keyGroup); + try { + setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getString(key)); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { + //ignored... + } + } + } catch (RuntimeException e) { + log.warn("Error occurs !", e); + } + return obj; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e3c1b00/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java new file mode 100644 index 0000000..60c8408 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.utils; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.MessageHeader; +import io.openmessaging.OMS; +import io.openmessaging.SendResult; +import io.openmessaging.rocketmq.domain.BytesMessageImpl; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import io.openmessaging.rocketmq.domain.SendResultImpl; +import java.lang.reflect.Field; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageAccessor; + +public class OMSUtil { + + /** + * Builds a OMS client instance name. + * + * @return a unique instance name + */ + public static String buildInstanceName() { + return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime(); + } + + public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) { + org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); + rmqMessage.setBody(omsMessage.getBody()); + + KeyValue headers = omsMessage.headers(); + KeyValue properties = omsMessage.properties(); + + //All destinations in RocketMQ use Topic + if (headers.containsKey(MessageHeader.TOPIC)) { + rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC)); + rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + } else { + rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE)); + rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE"); + } + + for (String key : properties.keySet()) { + MessageAccessor.putProperty(rmqMessage, key, properties.getString(key)); + } + + //Headers has a high priority + for (String key : headers.keySet()) { + MessageAccessor.putProperty(rmqMessage, key, headers.getString(key)); + } + + return rmqMessage; + } + + public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) { + BytesMessage omsMsg = new BytesMessageImpl(); + omsMsg.setBody(rmqMsg.getBody()); + + KeyValue headers = omsMsg.headers(); + KeyValue properties = omsMsg.properties(); + + final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet(); + + for (final Map.Entry<String, String> entry : entries) { + if (isOMSHeader(entry.getKey())) { + headers.put(entry.getKey(), entry.getValue()); + } else { + properties.put(entry.getKey(), entry.getValue()); + } + } + + omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId()); + if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) || + rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) { + omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic()); + } else { + omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic()); + } + omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys()); + omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost())); + omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp()); + omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost())); + omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp()); + return omsMsg; + } + + public static boolean isOMSHeader(String value) { + for (Field field : MessageHeader.class.getDeclaredFields()) { + try { + if (field.get(MessageHeader.class).equals(value)) { + return true; + } + } catch (IllegalAccessException e) { + return false; + } + } + return false; + } + + /** + * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult. + */ + public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) { + assert rmqResult.getSendStatus().equals(SendStatus.SEND_OK); + return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue()); + } + + public static KeyValue buildKeyValue(KeyValue... keyValues) { + KeyValue keyValue = OMS.newKeyValue(); + for (KeyValue properties : keyValues) { + for (String key : properties.keySet()) { + keyValue.put(key, properties.getString(key)); + } + } + return keyValue; + } + + /** + * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}. + */ + public static <T> Iterator<T> cycle(final Iterable<T> iterable) { + return new Iterator<T>() { + Iterator<T> iterator = new Iterator<T>() { + @Override + public synchronized boolean hasNext() { + return false; + } + + @Override + public synchronized T next() { + throw new NoSuchElementException(); + } + + @Override + public synchronized void remove() { + //Ignore + } + }; + + @Override + public synchronized boolean hasNext() { + return iterator.hasNext() || iterable.iterator().hasNext(); + } + + @Override + public synchronized T next() { + if (!iterator.hasNext()) { + iterator = iterable.iterator(); + if (!iterator.hasNext()) { + throw new NoSuchElementException(); + } + } + return iterator.next(); + } + + @Override + public synchronized void remove() { + iterator.remove(); + } + }; + } +}
