http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java new file mode 100644 index 0000000..070635a --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -0,0 +1,380 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer; + +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.common.message.*; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class DefaultMQProducer extends ClientConfig implements MQProducer { + protected final transient DefaultMQProducerImpl defaultMQProducerImpl; + private String producerGroup; + /** + * Just for testing or demo program + */ + private String createTopicKey = MixAll.DEFAULT_TOPIC; + private volatile int defaultTopicQueueNums = 4; + private int sendMsgTimeout = 3000; + private int compressMsgBodyOverHowmuch = 1024 * 4; + private int retryTimesWhenSendFailed = 2; + private int retryTimesWhenSendAsyncFailed = 2; + + private boolean retryAnotherBrokerWhenNotStoreOK = false; + private int maxMessageSize = 1024 * 1024 * 4; // 4M + public DefaultMQProducer() { + this(MixAll.DEFAULT_PRODUCER_GROUP, null); + } + + + public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { + this.producerGroup = producerGroup; + defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); + } + + + public DefaultMQProducer(final String producerGroup) { + this(producerGroup, null); + } + + + public DefaultMQProducer(RPCHook rpcHook) { + this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook); + } + + + @Override + public void start() throws MQClientException { + this.defaultMQProducerImpl.start(); + } + + @Override + public void shutdown() { + this.defaultMQProducerImpl.shutdown(); + } + + + @Override + public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException { + return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic); + } + + + @Override + public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(msg); + } + + + @Override + public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(msg, timeout); + } + + + @Override + public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.send(msg, sendCallback); + } + + + @Override + public void send(Message msg, SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.send(msg, sendCallback, timeout); + } + + + @Override + public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.sendOneway(msg); + } + + + @Override + public SendResult send(Message msg, MessageQueue mq) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(msg, mq); + } + + + @Override + public SendResult send(Message msg, MessageQueue mq, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(msg, mq, timeout); + } + + + @Override + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.send(msg, mq, sendCallback); + } + + + @Override + public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout); + } + + + @Override + public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.sendOneway(msg, mq); + } + + + @Override + public SendResult send(Message msg, MessageQueueSelector selector, Object arg) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(msg, selector, arg); + } + + + @Override + public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(msg, selector, arg, timeout); + } + + + @Override + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback); + } + + + @Override + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); + } + + + @Override + public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.sendOneway(msg, selector, arg); + } + + + @Override + public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg) + throws MQClientException { + throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); + } + + + @Override + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + createTopic(key, newTopic, queueNum, 0); + } + + + @Override + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { + this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); + } + + + @Override + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return this.defaultMQProducerImpl.searchOffset(mq, timestamp); + } + + + @Override + public long maxOffset(MessageQueue mq) throws MQClientException { + return this.defaultMQProducerImpl.maxOffset(mq); + } + + + @Override + public long minOffset(MessageQueue mq) throws MQClientException { + return this.defaultMQProducerImpl.minOffset(mq); + } + + + @Override + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + return this.defaultMQProducerImpl.earliestMsgStoreTime(mq); + } + + + @Override + public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return this.defaultMQProducerImpl.viewMessage(offsetMsgId); + } + + + @Override + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end); + } + + + @Override + public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + try { + MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId); + return this.viewMessage(msgId); + } catch (Exception e) { + } + return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId); + } + + public String getProducerGroup() { + return producerGroup; + } + + + public void setProducerGroup(String producerGroup) { + this.producerGroup = producerGroup; + } + + + public String getCreateTopicKey() { + return createTopicKey; + } + + + public void setCreateTopicKey(String createTopicKey) { + this.createTopicKey = createTopicKey; + } + + + public int getSendMsgTimeout() { + return sendMsgTimeout; + } + + + public void setSendMsgTimeout(int sendMsgTimeout) { + this.sendMsgTimeout = sendMsgTimeout; + } + + + public int getCompressMsgBodyOverHowmuch() { + return compressMsgBodyOverHowmuch; + } + + + public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) { + this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch; + } + + + public DefaultMQProducerImpl getDefaultMQProducerImpl() { + return defaultMQProducerImpl; + } + + + public boolean isRetryAnotherBrokerWhenNotStoreOK() { + return retryAnotherBrokerWhenNotStoreOK; + } + + + public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) { + this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK; + } + + + public int getMaxMessageSize() { + return maxMessageSize; + } + + + public void setMaxMessageSize(int maxMessageSize) { + this.maxMessageSize = maxMessageSize; + } + + + public int getDefaultTopicQueueNums() { + return defaultTopicQueueNums; + } + + + public void setDefaultTopicQueueNums(int defaultTopicQueueNums) { + this.defaultTopicQueueNums = defaultTopicQueueNums; + } + + + public int getRetryTimesWhenSendFailed() { + return retryTimesWhenSendFailed; + } + + + public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) { + this.retryTimesWhenSendFailed = retryTimesWhenSendFailed; + } + + + public boolean isSendMessageWithVIPChannel() { + return isVipChannelEnabled(); + } + + + public void setSendMessageWithVIPChannel(final boolean sendMessageWithVIPChannel) { + this.setVipChannelEnabled(sendMessageWithVIPChannel); + } + + + public long[] getNotAvailableDuration() { + return this.defaultMQProducerImpl.getNotAvailableDuration(); + } + + public void setNotAvailableDuration(final long[] notAvailableDuration) { + this.defaultMQProducerImpl.setNotAvailableDuration(notAvailableDuration); + } + + public long[] getLatencyMax() { + return this.defaultMQProducerImpl.getLatencyMax(); + } + + public void setLatencyMax(final long[] latencyMax) { + this.defaultMQProducerImpl.setLatencyMax(latencyMax); + } + + public boolean isSendLatencyFaultEnable() { + return this.defaultMQProducerImpl.isSendLatencyFaultEnable(); + } + + public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { + this.defaultMQProducerImpl.setSendLatencyFaultEnable(sendLatencyFaultEnable); + } + + public int getRetryTimesWhenSendAsyncFailed() { + return retryTimesWhenSendAsyncFailed; + } + + public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) { + this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java new file mode 100644 index 0000000..5e8178a --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer; + +import org.apache.rocketmq.common.message.Message; + + +/** + * @author shijia.wxr + */ +public interface LocalTransactionExecuter { + public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java new file mode 100644 index 0000000..ce5b0d9 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer; + +/** + * @author shijia.wxr + */ +public enum LocalTransactionState { + COMMIT_MESSAGE, + ROLLBACK_MESSAGE, + UNKNOW, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java new file mode 100644 index 0000000..0ea4a33 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer; + +import org.apache.rocketmq.client.MQAdmin; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public interface MQProducer extends MQAdmin { + void start() throws MQClientException; + + void shutdown(); + + + List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException; + + + SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; + + + SendResult send(final Message msg, final long timeout) throws MQClientException, + RemotingException, MQBrokerException, InterruptedException; + + + void send(final Message msg, final SendCallback sendCallback) throws MQClientException, + RemotingException, InterruptedException; + + + void send(final Message msg, final SendCallback sendCallback, final long timeout) + throws MQClientException, RemotingException, InterruptedException; + + + void sendOneway(final Message msg) throws MQClientException, RemotingException, + InterruptedException; + + + SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, + RemotingException, MQBrokerException, InterruptedException; + + + SendResult send(final Message msg, final MessageQueue mq, final long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException; + + + void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException; + + + void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException; + + + void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, + RemotingException, InterruptedException; + + + SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException; + + + SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg, + final long timeout) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; + + + void send(final Message msg, final MessageQueueSelector selector, final Object arg, + final SendCallback sendCallback) throws MQClientException, RemotingException, + InterruptedException; + + + void send(final Message msg, final MessageQueueSelector selector, final Object arg, + final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, + InterruptedException; + + + void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) + throws MQClientException, RemotingException, InterruptedException; + + + TransactionSendResult sendMessageInTransaction(final Message msg, + final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java b/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java new file mode 100644 index 0000000..c7a9124 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer; + +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public interface MessageQueueSelector { + MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java new file mode 100644 index 0000000..7b0e00e --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer; + +/** + * @author shijia.wxr + */ +public interface SendCallback { + public void onSuccess(final SendResult sendResult); + + + public void onException(final Throwable e); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java new file mode 100644 index 0000000..02ed6b5 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer; + +import com.alibaba.fastjson.JSON; +import org.apache.rocketmq.common.message.MessageQueue; + + +/** + * @author shijia.wxr + */ +public class SendResult { + private SendStatus sendStatus; + private String msgId; + private MessageQueue messageQueue; + private long queueOffset; + private String transactionId; + private String offsetMsgId; + private String regionId; + private boolean traceOn = true; + + public SendResult() { + } + + public SendResult(SendStatus sendStatus, String msgId, String offsetMsgId, MessageQueue messageQueue, long queueOffset) { + this.sendStatus = sendStatus; + this.msgId = msgId; + this.offsetMsgId = offsetMsgId; + this.messageQueue = messageQueue; + this.queueOffset = queueOffset; + } + + public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue, final long queueOffset, final String transactionId, final String offsetMsgId, final String regionId) { + this.sendStatus = sendStatus; + this.msgId = msgId; + this.messageQueue = messageQueue; + this.queueOffset = queueOffset; + this.transactionId = transactionId; + this.offsetMsgId = offsetMsgId; + this.regionId = regionId; + } + + public boolean isTraceOn() { + return traceOn; + } + + public void setTraceOn(final boolean traceOn) { + this.traceOn = traceOn; + } + + public String getRegionId() { + return regionId; + } + + public void setRegionId(final String regionId) { + this.regionId = regionId; + } + + public static String encoderSendResultToJson(final Object obj) { + return JSON.toJSONString(obj); + } + + public static SendResult decoderSendResultFromJson(String json) { + return JSON.parseObject(json, SendResult.class); + } + + public String getMsgId() { + return msgId; + } + + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + + public SendStatus getSendStatus() { + return sendStatus; + } + + + public void setSendStatus(SendStatus sendStatus) { + this.sendStatus = sendStatus; + } + + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + + public void setMessageQueue(MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + + public long getQueueOffset() { + return queueOffset; + } + + + public void setQueueOffset(long queueOffset) { + this.queueOffset = queueOffset; + } + + + public String getTransactionId() { + return transactionId; + } + + + public void setTransactionId(String transactionId) { + this.transactionId = transactionId; + } + + public String getOffsetMsgId() { + return offsetMsgId; + } + + public void setOffsetMsgId(String offsetMsgId) { + this.offsetMsgId = offsetMsgId; + } + + @Override + public String toString() { + return "SendResult [sendStatus=" + sendStatus + ", msgId=" + msgId + ", offsetMsgId=" + offsetMsgId + ", messageQueue=" + messageQueue + + ", queueOffset=" + queueOffset + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java new file mode 100644 index 0000000..038bc99 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer; + +/** + * @author shijia.wxr + */ +public enum SendStatus { + SEND_OK, + FLUSH_DISK_TIMEOUT, + FLUSH_SLAVE_TIMEOUT, + SLAVE_NOT_AVAILABLE, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java new file mode 100644 index 0000000..9a11d50 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer; + +import org.apache.rocketmq.common.message.MessageExt; + + +/** + * @author shijia.wxr + */ +public interface TransactionCheckListener { + LocalTransactionState checkLocalTransactionState(final MessageExt msg); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java new file mode 100644 index 0000000..eaca6ec --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.RPCHook; + + +/** + * @author shijia.wxr + */ +public class TransactionMQProducer extends DefaultMQProducer { + private TransactionCheckListener transactionCheckListener; + private int checkThreadPoolMinSize = 1; + private int checkThreadPoolMaxSize = 1; + private int checkRequestHoldMax = 2000; + + + public TransactionMQProducer() { + } + + + public TransactionMQProducer(final String producerGroup) { + super(producerGroup); + } + + public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) { + super(producerGroup, rpcHook); + } + + @Override + public void start() throws MQClientException { + this.defaultMQProducerImpl.initTransactionEnv(); + super.start(); + } + + + @Override + public void shutdown() { + super.shutdown(); + this.defaultMQProducerImpl.destroyTransactionEnv(); + } + + + @Override + public TransactionSendResult sendMessageInTransaction(final Message msg, + final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { + if (null == this.transactionCheckListener) { + throw new MQClientException("localTransactionBranchCheckListener is null", null); + } + + return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg); + } + + + public TransactionCheckListener getTransactionCheckListener() { + return transactionCheckListener; + } + + + public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) { + this.transactionCheckListener = transactionCheckListener; + } + + + public int getCheckThreadPoolMinSize() { + return checkThreadPoolMinSize; + } + + + public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) { + this.checkThreadPoolMinSize = checkThreadPoolMinSize; + } + + + public int getCheckThreadPoolMaxSize() { + return checkThreadPoolMaxSize; + } + + + public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) { + this.checkThreadPoolMaxSize = checkThreadPoolMaxSize; + } + + + public int getCheckRequestHoldMax() { + return checkRequestHoldMax; + } + + + public void setCheckRequestHoldMax(int checkRequestHoldMax) { + this.checkRequestHoldMax = checkRequestHoldMax; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java new file mode 100644 index 0000000..478c39d --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer; + +/** + * @author shijia.wxr + */ +public class TransactionSendResult extends SendResult { + private LocalTransactionState localTransactionState; + + + public TransactionSendResult() { + } + + + public LocalTransactionState getLocalTransactionState() { + return localTransactionState; + } + + + public void setLocalTransactionState(LocalTransactionState localTransactionState) { + this.localTransactionState = localTransactionState; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java new file mode 100644 index 0000000..0f6ce48 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer.selector; + +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class SelectMessageQueueByHash implements MessageQueueSelector { + + @Override + public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { + int value = arg.hashCode(); + if (value < 0) { + value = Math.abs(value); + } + + value = value % mqs.size(); + return mqs.get(value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java new file mode 100644 index 0000000..1902de5 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer.selector; + +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.List; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class SelectMessageQueueByMachineRoom implements MessageQueueSelector { + private Set<String> consumeridcs; + + + @Override + public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { + return null; + } + + + public Set<String> getConsumeridcs() { + return consumeridcs; + } + + + public void setConsumeridcs(Set<String> consumeridcs) { + this.consumeridcs = consumeridcs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java new file mode 100644 index 0000000..b39b777 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer.selector; + +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.List; +import java.util.Random; + + +/** + * @author shijia.wxr + */ +public class SelectMessageQueueByRandoom implements MessageQueueSelector { + private Random random = new Random(System.currentTimeMillis()); + + + @Override + public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { + int value = random.nextInt(); + if (value < 0) { + value = Math.abs(value); + } + + value = value % mqs.size(); + return mqs.get(value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java b/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java new file mode 100644 index 0000000..3234ada --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.stat; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.body.ConsumeStatus; +import org.apache.rocketmq.common.stats.StatsItemSet; +import org.apache.rocketmq.common.stats.StatsSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledExecutorService; + + +public class ConsumerStatsManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME); + + private static final String TOPIC_AND_GROUP_CONSUME_OK_TPS = "CONSUME_OK_TPS"; + private static final String TOPIC_AND_GROUP_CONSUME_FAILED_TPS = "CONSUME_FAILED_TPS"; + private static final String TOPIC_AND_GROUP_CONSUME_RT = "CONSUME_RT"; + private static final String TOPIC_AND_GROUP_PULL_TPS = "PULL_TPS"; + private static final String TOPIC_AND_GROUP_PULL_RT = "PULL_RT"; + + private final StatsItemSet topicAndGroupConsumeOKTPS; + private final StatsItemSet topicAndGroupConsumeRT; + private final StatsItemSet topicAndGroupConsumeFailedTPS; + private final StatsItemSet topicAndGroupPullTPS; + private final StatsItemSet topicAndGroupPullRT; + + + public ConsumerStatsManager(final ScheduledExecutorService scheduledExecutorService) { + this.topicAndGroupConsumeOKTPS = + new StatsItemSet(TOPIC_AND_GROUP_CONSUME_OK_TPS, scheduledExecutorService, log); + + this.topicAndGroupConsumeRT = + new StatsItemSet(TOPIC_AND_GROUP_CONSUME_RT, scheduledExecutorService, log); + + this.topicAndGroupConsumeFailedTPS = + new StatsItemSet(TOPIC_AND_GROUP_CONSUME_FAILED_TPS, scheduledExecutorService, log); + + this.topicAndGroupPullTPS = new StatsItemSet(TOPIC_AND_GROUP_PULL_TPS, scheduledExecutorService, log); + + this.topicAndGroupPullRT = new StatsItemSet(TOPIC_AND_GROUP_PULL_RT, scheduledExecutorService, log); + } + + + public void start() { + } + + + public void shutdown() { + } + + + public void incPullRT(final String group, final String topic, final long rt) { + this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1); + } + + + public void incPullTPS(final String group, final String topic, final long msgs) { + this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int) msgs, 1); + } + + + public void incConsumeRT(final String group, final String topic, final long rt) { + this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1); + } + + + public void incConsumeOKTPS(final String group, final String topic, final long msgs) { + this.topicAndGroupConsumeOKTPS.addValue(topic + "@" + group, (int) msgs, 1); + } + + + public void incConsumeFailedTPS(final String group, final String topic, final long msgs) { + this.topicAndGroupConsumeFailedTPS.addValue(topic + "@" + group, (int) msgs, 1); + } + + public ConsumeStatus consumeStatus(final String group, final String topic) { + ConsumeStatus cs = new ConsumeStatus(); + { + StatsSnapshot ss = this.getPullRT(group, topic); + if (ss != null) { + cs.setPullRT(ss.getAvgpt()); + } + } + + { + StatsSnapshot ss = this.getPullTPS(group, topic); + if (ss != null) { + cs.setPullTPS(ss.getTps()); + } + } + + { + StatsSnapshot ss = this.getConsumeRT(group, topic); + if (ss != null) { + cs.setConsumeRT(ss.getAvgpt()); + } + } + + { + StatsSnapshot ss = this.getConsumeOKTPS(group, topic); + if (ss != null) { + cs.setConsumeOKTPS(ss.getTps()); + } + } + + { + StatsSnapshot ss = this.getConsumeFailedTPS(group, topic); + if (ss != null) { + cs.setConsumeFailedTPS(ss.getTps()); + } + } + + { + StatsSnapshot ss = this.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group); + if (ss != null) { + cs.setConsumeFailedMsgs(ss.getSum()); + } + } + + return cs; + } + + private StatsSnapshot getPullRT(final String group, final String topic) { + return this.topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group); + } + + private StatsSnapshot getPullTPS(final String group, final String topic) { + return this.topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group); + } + + private StatsSnapshot getConsumeRT(final String group, final String topic) { + StatsSnapshot statsData = this.topicAndGroupConsumeRT.getStatsDataInMinute(topic + "@" + group); + if (0 == statsData.getSum()) { + statsData = this.topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group); + } + + return statsData; + } + + private StatsSnapshot getConsumeOKTPS(final String group, final String topic) { + return this.topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group); + } + + private StatsSnapshot getConsumeFailedTPS(final String group, final String topic) { + return this.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java b/client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java deleted file mode 100644 index 2a10ec4..0000000 --- a/client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.client; - -import com.alibaba.rocketmq.client.exception.MQClientException; -import org.junit.Assert; -import org.junit.Test; - - -public class ValidatorsTest { - - @Test - public void topicValidatorTest() throws MQClientException { - Validators.checkTopic("Hello"); - Validators.checkTopic("%RETRY%Hello"); - Validators.checkTopic("_%RETRY%Hello"); - Validators.checkTopic("-%RETRY%Hello"); - Validators.checkTopic("223-%RETRY%Hello"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java b/client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java deleted file mode 100644 index 5ef75ed..0000000 --- a/client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java +++ /dev/null @@ -1,272 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * @author [email protected] - * @version $id$ - */ -package com.alibaba.rocketmq.client.consumer.loadbalance; - -import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy; -import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; -import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle; -import com.alibaba.rocketmq.common.message.MessageQueue; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - - -/** - * @author [email protected] created on 2013-07-03 16:24 - */ -public class AllocateMessageQueueAveragelyTest { - private AllocateMessageQueueStrategy allocateMessageQueueAveragely; - private String currentCID; - private String topic; - private List<MessageQueue> messageQueueList; - private List<String> consumerIdList; - - @Before - public void init() { - allocateMessageQueueAveragely = new AllocateMessageQueueAveragely(); - topic = "topic_test"; - } - - @Test - public void testConsumer1() { - currentCID = "0"; - createConsumerIdList(1); - createMessageQueueList(5); - List<MessageQueue> result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer1"); - Assert.assertEquals(result.size(), 5); - Assert.assertEquals(result.containsAll(getMessageQueueList()), true); - } - - public void createConsumerIdList(int size) { - consumerIdList = new ArrayList<String>(size); - for (int i = 0; i < size; i++) { - consumerIdList.add(String.valueOf(i)); - } - } - - public void createMessageQueueList(int size) { - messageQueueList = new ArrayList<MessageQueue>(size); - for (int i = 0; i < size; i++) { - MessageQueue mq = new MessageQueue(topic, "brokerName", i); - messageQueueList.add(mq); - } - } - - public void printMessageQueue(List<MessageQueue> messageQueueList, String name) { - if (messageQueueList == null || messageQueueList.size() < 1) - return; - System.out.println(name + ".......................................start"); - for (MessageQueue messageQueue : messageQueueList) { - System.out.println(messageQueue); - } - System.out.println(name + ".......................................end"); - } - - public List<MessageQueue> getMessageQueueList() { - return messageQueueList; - } - - public void setMessageQueueList(List<MessageQueue> messageQueueList) { - this.messageQueueList = messageQueueList; - } - - @Test - public void testConsumer2() { - currentCID = "1"; - createConsumerIdList(2); - createMessageQueueList(5); - List<MessageQueue> result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer2"); - Assert.assertEquals(result.size(), 3); - Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true); - - } - - @Test - public void testConsumer3CurrentCID0() { - currentCID = "0"; - createConsumerIdList(3); - createMessageQueueList(5); - List<MessageQueue> result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer3CurrentCID0"); - Assert.assertEquals(result.size(), 1); - Assert.assertEquals(result.containsAll(getMessageQueueList().subList(0, 1)), true); - } - - @Test - public void testConsumer3CurrentCID1() { - currentCID = "1"; - createConsumerIdList(3); - createMessageQueueList(5); - List<MessageQueue> result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer3CurrentCID1"); - Assert.assertEquals(result.size(), 1); - Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true); - } - - @Test - public void testConsumer3CurrentCID2() { - currentCID = "2"; - createConsumerIdList(3); - createMessageQueueList(5); - List<MessageQueue> result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer3CurrentCID2"); - Assert.assertEquals(result.size(), 3); - Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true); - } - - @Test - public void testConsumer4() { - currentCID = "1"; - createConsumerIdList(4); - createMessageQueueList(5); - List<MessageQueue> result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer4"); - Assert.assertEquals(result.size(), 1); - Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true); - } - - @Test - public void testConsumer5() { - currentCID = "1"; - createConsumerIdList(5); - createMessageQueueList(5); - List<MessageQueue> result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer5"); - Assert.assertEquals(result.size(), 1); - Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true); - } - - @Test - public void testConsumer6() { - currentCID = "1"; - createConsumerIdList(2); - createMessageQueueList(6); - List<MessageQueue> result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer"); - Assert.assertEquals(result.size(), 3); - Assert.assertEquals(result.containsAll(getMessageQueueList().subList(3, 6)), true); - } - - @Test - public void testCurrentCIDNotExists() { - currentCID = String.valueOf(Integer.MAX_VALUE); - createConsumerIdList(2); - createMessageQueueList(6); - List<MessageQueue> result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testCurrentCIDNotExists"); - Assert.assertEquals(result.size(), 0); - } - - @Test(expected = IllegalArgumentException.class) - public void testCurrentCIDIllegalArgument() { - createConsumerIdList(2); - createMessageQueueList(6); - allocateMessageQueueAveragely.allocate("", "", getMessageQueueList(), getConsumerIdList()); - } - - public List<String> getConsumerIdList() { - return consumerIdList; - } - - public void setConsumerIdList(List<String> consumerIdList) { - this.consumerIdList = consumerIdList; - } - - @Test(expected = IllegalArgumentException.class) - public void testMessageQueueIllegalArgument() { - currentCID = "0"; - createConsumerIdList(2); - allocateMessageQueueAveragely.allocate("", currentCID, null, getConsumerIdList()); - } - - @Test(expected = IllegalArgumentException.class) - public void testConsumerIdIllegalArgument() { - currentCID = "0"; - createMessageQueueList(6); - allocateMessageQueueAveragely.allocate("", currentCID, getMessageQueueList(), null); - } - - @Test - public void testAllocate() { - AllocateMessageQueueAveragely allocateMessageQueueAveragely = new AllocateMessageQueueAveragely(); - String topic = "topic_test"; - String currentCID = "CID"; - int queueSize = 19; - int consumerSize = 10; - List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); - for (int i = 0; i < queueSize; i++) { - MessageQueue mq = new MessageQueue(topic, "brokerName", i); - mqAll.add(mq); - } - - List<String> cidAll = new ArrayList<String>(); - for (int j = 0; j < consumerSize; j++) { - cidAll.add("CID" + j); - } - System.out.println(mqAll.toString()); - System.out.println(cidAll.toString()); - for (int i = 0; i < consumerSize; i++) { - List<MessageQueue> rs = allocateMessageQueueAveragely.allocate("", currentCID + i, mqAll, cidAll); - System.out.println("rs[" + currentCID + i + "]:" + rs.toString()); - } - } - - - @Test - public void testAllocateByCircle() { - AllocateMessageQueueAveragelyByCircle circle = new AllocateMessageQueueAveragelyByCircle(); - String topic = "topic_test"; - String currentCID = "CID"; - int consumerSize = 3; - int queueSize = 13; - List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); - for (int i = 0; i < queueSize; i++) { - MessageQueue mq = new MessageQueue(topic, "brokerName", i); - mqAll.add(mq); - } - - List<String> cidAll = new ArrayList<String>(); - for (int j = 0; j < consumerSize; j++) { - cidAll.add("CID" + j); - } - System.out.println(mqAll.toString()); - System.out.println(cidAll.toString()); - for (int i = 0; i < consumerSize; i++) { - List<MessageQueue> rs = circle.allocate("", currentCID + i, mqAll, cidAll); - System.out.println("rs[" + currentCID + i + "]:" + rs.toString()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java new file mode 100644 index 0000000..a3daba5 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.junit.Test; + + +public class ValidatorsTest { + + @Test + public void topicValidatorTest() throws MQClientException { + Validators.checkTopic("Hello"); + Validators.checkTopic("%RETRY%Hello"); + Validators.checkTopic("_%RETRY%Hello"); + Validators.checkTopic("-%RETRY%Hello"); + Validators.checkTopic("223-%RETRY%Hello"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java new file mode 100644 index 0000000..7b568c5 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * @author [email protected] + * @version $id$ + */ +package org.apache.rocketmq.client.consumer.loadbalance; + +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + + +/** + * @author [email protected] created on 2013-07-03 16:24 + */ +public class AllocateMessageQueueAveragelyTest { + private AllocateMessageQueueStrategy allocateMessageQueueAveragely; + private String currentCID; + private String topic; + private List<MessageQueue> messageQueueList; + private List<String> consumerIdList; + + @Before + public void init() { + allocateMessageQueueAveragely = new AllocateMessageQueueAveragely(); + topic = "topic_test"; + } + + @Test + public void testConsumer1() { + currentCID = "0"; + createConsumerIdList(1); + createMessageQueueList(5); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer1"); + Assert.assertEquals(result.size(), 5); + Assert.assertEquals(result.containsAll(getMessageQueueList()), true); + } + + public void createConsumerIdList(int size) { + consumerIdList = new ArrayList<String>(size); + for (int i = 0; i < size; i++) { + consumerIdList.add(String.valueOf(i)); + } + } + + public void createMessageQueueList(int size) { + messageQueueList = new ArrayList<MessageQueue>(size); + for (int i = 0; i < size; i++) { + MessageQueue mq = new MessageQueue(topic, "brokerName", i); + messageQueueList.add(mq); + } + } + + public void printMessageQueue(List<MessageQueue> messageQueueList, String name) { + if (messageQueueList == null || messageQueueList.size() < 1) + return; + System.out.println(name + ".......................................start"); + for (MessageQueue messageQueue : messageQueueList) { + System.out.println(messageQueue); + } + System.out.println(name + ".......................................end"); + } + + public List<MessageQueue> getMessageQueueList() { + return messageQueueList; + } + + public void setMessageQueueList(List<MessageQueue> messageQueueList) { + this.messageQueueList = messageQueueList; + } + + @Test + public void testConsumer2() { + currentCID = "1"; + createConsumerIdList(2); + createMessageQueueList(5); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer2"); + Assert.assertEquals(result.size(), 3); + Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true); + + } + + @Test + public void testConsumer3CurrentCID0() { + currentCID = "0"; + createConsumerIdList(3); + createMessageQueueList(5); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer3CurrentCID0"); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.containsAll(getMessageQueueList().subList(0, 1)), true); + } + + @Test + public void testConsumer3CurrentCID1() { + currentCID = "1"; + createConsumerIdList(3); + createMessageQueueList(5); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer3CurrentCID1"); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true); + } + + @Test + public void testConsumer3CurrentCID2() { + currentCID = "2"; + createConsumerIdList(3); + createMessageQueueList(5); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer3CurrentCID2"); + Assert.assertEquals(result.size(), 3); + Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true); + } + + @Test + public void testConsumer4() { + currentCID = "1"; + createConsumerIdList(4); + createMessageQueueList(5); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer4"); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true); + } + + @Test + public void testConsumer5() { + currentCID = "1"; + createConsumerIdList(5); + createMessageQueueList(5); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer5"); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true); + } + + @Test + public void testConsumer6() { + currentCID = "1"; + createConsumerIdList(2); + createMessageQueueList(6); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer"); + Assert.assertEquals(result.size(), 3); + Assert.assertEquals(result.containsAll(getMessageQueueList().subList(3, 6)), true); + } + + @Test + public void testCurrentCIDNotExists() { + currentCID = String.valueOf(Integer.MAX_VALUE); + createConsumerIdList(2); + createMessageQueueList(6); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testCurrentCIDNotExists"); + Assert.assertEquals(result.size(), 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testCurrentCIDIllegalArgument() { + createConsumerIdList(2); + createMessageQueueList(6); + allocateMessageQueueAveragely.allocate("", "", getMessageQueueList(), getConsumerIdList()); + } + + public List<String> getConsumerIdList() { + return consumerIdList; + } + + public void setConsumerIdList(List<String> consumerIdList) { + this.consumerIdList = consumerIdList; + } + + @Test(expected = IllegalArgumentException.class) + public void testMessageQueueIllegalArgument() { + currentCID = "0"; + createConsumerIdList(2); + allocateMessageQueueAveragely.allocate("", currentCID, null, getConsumerIdList()); + } + + @Test(expected = IllegalArgumentException.class) + public void testConsumerIdIllegalArgument() { + currentCID = "0"; + createMessageQueueList(6); + allocateMessageQueueAveragely.allocate("", currentCID, getMessageQueueList(), null); + } + + @Test + public void testAllocate() { + AllocateMessageQueueAveragely allocateMessageQueueAveragely = new AllocateMessageQueueAveragely(); + String topic = "topic_test"; + String currentCID = "CID"; + int queueSize = 19; + int consumerSize = 10; + List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); + for (int i = 0; i < queueSize; i++) { + MessageQueue mq = new MessageQueue(topic, "brokerName", i); + mqAll.add(mq); + } + + List<String> cidAll = new ArrayList<String>(); + for (int j = 0; j < consumerSize; j++) { + cidAll.add("CID" + j); + } + System.out.println(mqAll.toString()); + System.out.println(cidAll.toString()); + for (int i = 0; i < consumerSize; i++) { + List<MessageQueue> rs = allocateMessageQueueAveragely.allocate("", currentCID + i, mqAll, cidAll); + System.out.println("rs[" + currentCID + i + "]:" + rs.toString()); + } + } + + + @Test + public void testAllocateByCircle() { + AllocateMessageQueueAveragelyByCircle circle = new AllocateMessageQueueAveragelyByCircle(); + String topic = "topic_test"; + String currentCID = "CID"; + int consumerSize = 3; + int queueSize = 13; + List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); + for (int i = 0; i < queueSize; i++) { + MessageQueue mq = new MessageQueue(topic, "brokerName", i); + mqAll.add(mq); + } + + List<String> cidAll = new ArrayList<String>(); + for (int j = 0; j < consumerSize; j++) { + cidAll.add("CID" + j); + } + System.out.println(mqAll.toString()); + System.out.println(cidAll.toString()); + for (int i = 0; i < consumerSize; i++) { + List<MessageQueue> rs = circle.allocate("", currentCID + i, mqAll, cidAll); + System.out.println("rs[" + currentCID + i + "]:" + rs.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml index 72cc2b0..ec95a76 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -18,7 +18,7 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> - <groupId>com.alibaba.rocketmq</groupId> + <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>4.0.0-SNAPSHOT</version> </parent>
