http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java deleted file mode 100644 index 6f861d3..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java +++ /dev/null @@ -1,380 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.producer; - -import com.alibaba.rocketmq.client.ClientConfig; -import com.alibaba.rocketmq.client.QueryResult; -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.message.*; -import com.alibaba.rocketmq.remoting.RPCHook; -import com.alibaba.rocketmq.remoting.exception.RemotingException; - -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class DefaultMQProducer extends ClientConfig implements MQProducer { - protected final transient DefaultMQProducerImpl defaultMQProducerImpl; - private String producerGroup; - /** - * Just for testing or demo program - */ - private String createTopicKey = MixAll.DEFAULT_TOPIC; - private volatile int defaultTopicQueueNums = 4; - private int sendMsgTimeout = 3000; - private int compressMsgBodyOverHowmuch = 1024 * 4; - private int retryTimesWhenSendFailed = 2; - private int retryTimesWhenSendAsyncFailed = 2; - - private boolean retryAnotherBrokerWhenNotStoreOK = false; - private int maxMessageSize = 1024 * 1024 * 4; // 4M - public DefaultMQProducer() { - this(MixAll.DEFAULT_PRODUCER_GROUP, null); - } - - - public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { - this.producerGroup = producerGroup; - defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); - } - - - public DefaultMQProducer(final String producerGroup) { - this(producerGroup, null); - } - - - public DefaultMQProducer(RPCHook rpcHook) { - this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook); - } - - - @Override - public void start() throws MQClientException { - this.defaultMQProducerImpl.start(); - } - - @Override - public void shutdown() { - this.defaultMQProducerImpl.shutdown(); - } - - - @Override - public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException { - return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic); - } - - - @Override - public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQProducerImpl.send(msg); - } - - - @Override - public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQProducerImpl.send(msg, timeout); - } - - - @Override - public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { - this.defaultMQProducerImpl.send(msg, sendCallback); - } - - - @Override - public void send(Message msg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { - this.defaultMQProducerImpl.send(msg, sendCallback, timeout); - } - - - @Override - public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { - this.defaultMQProducerImpl.sendOneway(msg); - } - - - @Override - public SendResult send(Message msg, MessageQueue mq) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQProducerImpl.send(msg, mq); - } - - - @Override - public SendResult send(Message msg, MessageQueue mq, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQProducerImpl.send(msg, mq, timeout); - } - - - @Override - public void send(Message msg, MessageQueue mq, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { - this.defaultMQProducerImpl.send(msg, mq, sendCallback); - } - - - @Override - public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { - this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout); - } - - - @Override - public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { - this.defaultMQProducerImpl.sendOneway(msg, mq); - } - - - @Override - public SendResult send(Message msg, MessageQueueSelector selector, Object arg) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQProducerImpl.send(msg, selector, arg); - } - - - @Override - public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.defaultMQProducerImpl.send(msg, selector, arg, timeout); - } - - - @Override - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException { - this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback); - } - - - @Override - public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException { - this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); - } - - - @Override - public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) - throws MQClientException, RemotingException, InterruptedException { - this.defaultMQProducerImpl.sendOneway(msg, selector, arg); - } - - - @Override - public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg) - throws MQClientException { - throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); - } - - - @Override - public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { - createTopic(key, newTopic, queueNum, 0); - } - - - @Override - public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { - this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); - } - - - @Override - public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - return this.defaultMQProducerImpl.searchOffset(mq, timestamp); - } - - - @Override - public long maxOffset(MessageQueue mq) throws MQClientException { - return this.defaultMQProducerImpl.maxOffset(mq); - } - - - @Override - public long minOffset(MessageQueue mq) throws MQClientException { - return this.defaultMQProducerImpl.minOffset(mq); - } - - - @Override - public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { - return this.defaultMQProducerImpl.earliestMsgStoreTime(mq); - } - - - @Override - public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - return this.defaultMQProducerImpl.viewMessage(offsetMsgId); - } - - - @Override - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { - return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end); - } - - - @Override - public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - try { - MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId); - return this.viewMessage(msgId); - } catch (Exception e) { - } - return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId); - } - - public String getProducerGroup() { - return producerGroup; - } - - - public void setProducerGroup(String producerGroup) { - this.producerGroup = producerGroup; - } - - - public String getCreateTopicKey() { - return createTopicKey; - } - - - public void setCreateTopicKey(String createTopicKey) { - this.createTopicKey = createTopicKey; - } - - - public int getSendMsgTimeout() { - return sendMsgTimeout; - } - - - public void setSendMsgTimeout(int sendMsgTimeout) { - this.sendMsgTimeout = sendMsgTimeout; - } - - - public int getCompressMsgBodyOverHowmuch() { - return compressMsgBodyOverHowmuch; - } - - - public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) { - this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch; - } - - - public DefaultMQProducerImpl getDefaultMQProducerImpl() { - return defaultMQProducerImpl; - } - - - public boolean isRetryAnotherBrokerWhenNotStoreOK() { - return retryAnotherBrokerWhenNotStoreOK; - } - - - public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) { - this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK; - } - - - public int getMaxMessageSize() { - return maxMessageSize; - } - - - public void setMaxMessageSize(int maxMessageSize) { - this.maxMessageSize = maxMessageSize; - } - - - public int getDefaultTopicQueueNums() { - return defaultTopicQueueNums; - } - - - public void setDefaultTopicQueueNums(int defaultTopicQueueNums) { - this.defaultTopicQueueNums = defaultTopicQueueNums; - } - - - public int getRetryTimesWhenSendFailed() { - return retryTimesWhenSendFailed; - } - - - public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) { - this.retryTimesWhenSendFailed = retryTimesWhenSendFailed; - } - - - public boolean isSendMessageWithVIPChannel() { - return isVipChannelEnabled(); - } - - - public void setSendMessageWithVIPChannel(final boolean sendMessageWithVIPChannel) { - this.setVipChannelEnabled(sendMessageWithVIPChannel); - } - - - public long[] getNotAvailableDuration() { - return this.defaultMQProducerImpl.getNotAvailableDuration(); - } - - public void setNotAvailableDuration(final long[] notAvailableDuration) { - this.defaultMQProducerImpl.setNotAvailableDuration(notAvailableDuration); - } - - public long[] getLatencyMax() { - return this.defaultMQProducerImpl.getLatencyMax(); - } - - public void setLatencyMax(final long[] latencyMax) { - this.defaultMQProducerImpl.setLatencyMax(latencyMax); - } - - public boolean isSendLatencyFaultEnable() { - return this.defaultMQProducerImpl.isSendLatencyFaultEnable(); - } - - public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { - this.defaultMQProducerImpl.setSendLatencyFaultEnable(sendLatencyFaultEnable); - } - - public int getRetryTimesWhenSendAsyncFailed() { - return retryTimesWhenSendAsyncFailed; - } - - public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) { - this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java deleted file mode 100644 index af3723a..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.producer; - -import com.alibaba.rocketmq.common.message.Message; - - -/** - * @author shijia.wxr - */ -public interface LocalTransactionExecuter { - public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java deleted file mode 100644 index ee2a93a..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.producer; - -/** - * @author shijia.wxr - */ -public enum LocalTransactionState { - COMMIT_MESSAGE, - ROLLBACK_MESSAGE, - UNKNOW, -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java deleted file mode 100644 index e21bc00..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.producer; - -import com.alibaba.rocketmq.client.MQAdmin; -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.remoting.exception.RemotingException; - -import java.util.List; - - -/** - * @author shijia.wxr - */ -public interface MQProducer extends MQAdmin { - void start() throws MQClientException; - - void shutdown(); - - - List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException; - - - SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, - InterruptedException; - - - SendResult send(final Message msg, final long timeout) throws MQClientException, - RemotingException, MQBrokerException, InterruptedException; - - - void send(final Message msg, final SendCallback sendCallback) throws MQClientException, - RemotingException, InterruptedException; - - - void send(final Message msg, final SendCallback sendCallback, final long timeout) - throws MQClientException, RemotingException, InterruptedException; - - - void sendOneway(final Message msg) throws MQClientException, RemotingException, - InterruptedException; - - - SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, - RemotingException, MQBrokerException, InterruptedException; - - - SendResult send(final Message msg, final MessageQueue mq, final long timeout) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException; - - - void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) - throws MQClientException, RemotingException, InterruptedException; - - - void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout) - throws MQClientException, RemotingException, InterruptedException; - - - void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, - RemotingException, InterruptedException; - - - SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg) - throws MQClientException, RemotingException, MQBrokerException, InterruptedException; - - - SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg, - final long timeout) throws MQClientException, RemotingException, MQBrokerException, - InterruptedException; - - - void send(final Message msg, final MessageQueueSelector selector, final Object arg, - final SendCallback sendCallback) throws MQClientException, RemotingException, - InterruptedException; - - - void send(final Message msg, final MessageQueueSelector selector, final Object arg, - final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, - InterruptedException; - - - void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) - throws MQClientException, RemotingException, InterruptedException; - - - TransactionSendResult sendMessageInTransaction(final Message msg, - final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java deleted file mode 100644 index 924c145..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.producer; - -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.common.message.MessageQueue; - -import java.util.List; - - -/** - * @author shijia.wxr - */ -public interface MessageQueueSelector { - MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java deleted file mode 100644 index 35d1a72..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.producer; - -/** - * @author shijia.wxr - */ -public interface SendCallback { - public void onSuccess(final SendResult sendResult); - - - public void onException(final Throwable e); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java deleted file mode 100644 index 183accf..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.producer; - -import com.alibaba.fastjson.JSON; -import com.alibaba.rocketmq.common.message.MessageQueue; - - -/** - * @author shijia.wxr - */ -public class SendResult { - private SendStatus sendStatus; - private String msgId; - private MessageQueue messageQueue; - private long queueOffset; - private String transactionId; - private String offsetMsgId; - private String regionId; - private boolean traceOn = true; - - public SendResult() { - } - - public SendResult(SendStatus sendStatus, String msgId, String offsetMsgId, MessageQueue messageQueue, long queueOffset) { - this.sendStatus = sendStatus; - this.msgId = msgId; - this.offsetMsgId = offsetMsgId; - this.messageQueue = messageQueue; - this.queueOffset = queueOffset; - } - - public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue, final long queueOffset, final String transactionId, final String offsetMsgId, final String regionId) { - this.sendStatus = sendStatus; - this.msgId = msgId; - this.messageQueue = messageQueue; - this.queueOffset = queueOffset; - this.transactionId = transactionId; - this.offsetMsgId = offsetMsgId; - this.regionId = regionId; - } - - public boolean isTraceOn() { - return traceOn; - } - - public void setTraceOn(final boolean traceOn) { - this.traceOn = traceOn; - } - - public String getRegionId() { - return regionId; - } - - public void setRegionId(final String regionId) { - this.regionId = regionId; - } - - public static String encoderSendResultToJson(final Object obj) { - return JSON.toJSONString(obj); - } - - public static SendResult decoderSendResultFromJson(String json) { - return JSON.parseObject(json, SendResult.class); - } - - public String getMsgId() { - return msgId; - } - - - public void setMsgId(String msgId) { - this.msgId = msgId; - } - - - public SendStatus getSendStatus() { - return sendStatus; - } - - - public void setSendStatus(SendStatus sendStatus) { - this.sendStatus = sendStatus; - } - - - public MessageQueue getMessageQueue() { - return messageQueue; - } - - - public void setMessageQueue(MessageQueue messageQueue) { - this.messageQueue = messageQueue; - } - - - public long getQueueOffset() { - return queueOffset; - } - - - public void setQueueOffset(long queueOffset) { - this.queueOffset = queueOffset; - } - - - public String getTransactionId() { - return transactionId; - } - - - public void setTransactionId(String transactionId) { - this.transactionId = transactionId; - } - - public String getOffsetMsgId() { - return offsetMsgId; - } - - public void setOffsetMsgId(String offsetMsgId) { - this.offsetMsgId = offsetMsgId; - } - - @Override - public String toString() { - return "SendResult [sendStatus=" + sendStatus + ", msgId=" + msgId + ", offsetMsgId=" + offsetMsgId + ", messageQueue=" + messageQueue - + ", queueOffset=" + queueOffset + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java deleted file mode 100644 index 3bc572f..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.producer; - -/** - * @author shijia.wxr - */ -public enum SendStatus { - SEND_OK, - FLUSH_DISK_TIMEOUT, - FLUSH_SLAVE_TIMEOUT, - SLAVE_NOT_AVAILABLE, -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java deleted file mode 100644 index 8440537..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.producer; - -import com.alibaba.rocketmq.common.message.MessageExt; - - -/** - * @author shijia.wxr - */ -public interface TransactionCheckListener { - LocalTransactionState checkLocalTransactionState(final MessageExt msg); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java deleted file mode 100644 index 08dd4ab..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.producer; - -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.remoting.RPCHook; - - -/** - * @author shijia.wxr - */ -public class TransactionMQProducer extends DefaultMQProducer { - private TransactionCheckListener transactionCheckListener; - private int checkThreadPoolMinSize = 1; - private int checkThreadPoolMaxSize = 1; - private int checkRequestHoldMax = 2000; - - - public TransactionMQProducer() { - } - - - public TransactionMQProducer(final String producerGroup) { - super(producerGroup); - } - - public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) { - super(producerGroup, rpcHook); - } - - @Override - public void start() throws MQClientException { - this.defaultMQProducerImpl.initTransactionEnv(); - super.start(); - } - - - @Override - public void shutdown() { - super.shutdown(); - this.defaultMQProducerImpl.destroyTransactionEnv(); - } - - - @Override - public TransactionSendResult sendMessageInTransaction(final Message msg, - final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { - if (null == this.transactionCheckListener) { - throw new MQClientException("localTransactionBranchCheckListener is null", null); - } - - return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg); - } - - - public TransactionCheckListener getTransactionCheckListener() { - return transactionCheckListener; - } - - - public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) { - this.transactionCheckListener = transactionCheckListener; - } - - - public int getCheckThreadPoolMinSize() { - return checkThreadPoolMinSize; - } - - - public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) { - this.checkThreadPoolMinSize = checkThreadPoolMinSize; - } - - - public int getCheckThreadPoolMaxSize() { - return checkThreadPoolMaxSize; - } - - - public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) { - this.checkThreadPoolMaxSize = checkThreadPoolMaxSize; - } - - - public int getCheckRequestHoldMax() { - return checkRequestHoldMax; - } - - - public void setCheckRequestHoldMax(int checkRequestHoldMax) { - this.checkRequestHoldMax = checkRequestHoldMax; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java deleted file mode 100644 index e7dcd0e..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.producer; - -/** - * @author shijia.wxr - */ -public class TransactionSendResult extends SendResult { - private LocalTransactionState localTransactionState; - - - public TransactionSendResult() { - } - - - public LocalTransactionState getLocalTransactionState() { - return localTransactionState; - } - - - public void setLocalTransactionState(LocalTransactionState localTransactionState) { - this.localTransactionState = localTransactionState; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java deleted file mode 100644 index 648356b..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.producer.selector; - -import com.alibaba.rocketmq.client.producer.MessageQueueSelector; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.common.message.MessageQueue; - -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class SelectMessageQueueByHash implements MessageQueueSelector { - - @Override - public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { - int value = arg.hashCode(); - if (value < 0) { - value = Math.abs(value); - } - - value = value % mqs.size(); - return mqs.get(value); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java deleted file mode 100644 index a213391..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.producer.selector; - -import com.alibaba.rocketmq.client.producer.MessageQueueSelector; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.common.message.MessageQueue; - -import java.util.List; -import java.util.Set; - - -/** - * @author shijia.wxr - */ -public class SelectMessageQueueByMachineRoom implements MessageQueueSelector { - private Set<String> consumeridcs; - - - @Override - public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { - return null; - } - - - public Set<String> getConsumeridcs() { - return consumeridcs; - } - - - public void setConsumeridcs(Set<String> consumeridcs) { - this.consumeridcs = consumeridcs; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java deleted file mode 100644 index 3f381e4..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.producer.selector; - -import com.alibaba.rocketmq.client.producer.MessageQueueSelector; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.common.message.MessageQueue; - -import java.util.List; -import java.util.Random; - - -/** - * @author shijia.wxr - */ -public class SelectMessageQueueByRandoom implements MessageQueueSelector { - private Random random = new Random(System.currentTimeMillis()); - - - @Override - public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { - int value = random.nextInt(); - if (value < 0) { - value = Math.abs(value); - } - - value = value % mqs.size(); - return mqs.get(value); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java b/client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java deleted file mode 100644 index e07e233..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.client.stat; - -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus; -import com.alibaba.rocketmq.common.stats.StatsItemSet; -import com.alibaba.rocketmq.common.stats.StatsSnapshot; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ScheduledExecutorService; - - -public class ConsumerStatsManager { - private static final Logger log = LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME); - - private static final String TOPIC_AND_GROUP_CONSUME_OK_TPS = "CONSUME_OK_TPS"; - private static final String TOPIC_AND_GROUP_CONSUME_FAILED_TPS = "CONSUME_FAILED_TPS"; - private static final String TOPIC_AND_GROUP_CONSUME_RT = "CONSUME_RT"; - private static final String TOPIC_AND_GROUP_PULL_TPS = "PULL_TPS"; - private static final String TOPIC_AND_GROUP_PULL_RT = "PULL_RT"; - - private final StatsItemSet topicAndGroupConsumeOKTPS; - private final StatsItemSet topicAndGroupConsumeRT; - private final StatsItemSet topicAndGroupConsumeFailedTPS; - private final StatsItemSet topicAndGroupPullTPS; - private final StatsItemSet topicAndGroupPullRT; - - - public ConsumerStatsManager(final ScheduledExecutorService scheduledExecutorService) { - this.topicAndGroupConsumeOKTPS = - new StatsItemSet(TOPIC_AND_GROUP_CONSUME_OK_TPS, scheduledExecutorService, log); - - this.topicAndGroupConsumeRT = - new StatsItemSet(TOPIC_AND_GROUP_CONSUME_RT, scheduledExecutorService, log); - - this.topicAndGroupConsumeFailedTPS = - new StatsItemSet(TOPIC_AND_GROUP_CONSUME_FAILED_TPS, scheduledExecutorService, log); - - this.topicAndGroupPullTPS = new StatsItemSet(TOPIC_AND_GROUP_PULL_TPS, scheduledExecutorService, log); - - this.topicAndGroupPullRT = new StatsItemSet(TOPIC_AND_GROUP_PULL_RT, scheduledExecutorService, log); - } - - - public void start() { - } - - - public void shutdown() { - } - - - public void incPullRT(final String group, final String topic, final long rt) { - this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1); - } - - - public void incPullTPS(final String group, final String topic, final long msgs) { - this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int) msgs, 1); - } - - - public void incConsumeRT(final String group, final String topic, final long rt) { - this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1); - } - - - public void incConsumeOKTPS(final String group, final String topic, final long msgs) { - this.topicAndGroupConsumeOKTPS.addValue(topic + "@" + group, (int) msgs, 1); - } - - - public void incConsumeFailedTPS(final String group, final String topic, final long msgs) { - this.topicAndGroupConsumeFailedTPS.addValue(topic + "@" + group, (int) msgs, 1); - } - - public ConsumeStatus consumeStatus(final String group, final String topic) { - ConsumeStatus cs = new ConsumeStatus(); - { - StatsSnapshot ss = this.getPullRT(group, topic); - if (ss != null) { - cs.setPullRT(ss.getAvgpt()); - } - } - - { - StatsSnapshot ss = this.getPullTPS(group, topic); - if (ss != null) { - cs.setPullTPS(ss.getTps()); - } - } - - { - StatsSnapshot ss = this.getConsumeRT(group, topic); - if (ss != null) { - cs.setConsumeRT(ss.getAvgpt()); - } - } - - { - StatsSnapshot ss = this.getConsumeOKTPS(group, topic); - if (ss != null) { - cs.setConsumeOKTPS(ss.getTps()); - } - } - - { - StatsSnapshot ss = this.getConsumeFailedTPS(group, topic); - if (ss != null) { - cs.setConsumeFailedTPS(ss.getTps()); - } - } - - { - StatsSnapshot ss = this.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group); - if (ss != null) { - cs.setConsumeFailedMsgs(ss.getSum()); - } - } - - return cs; - } - - private StatsSnapshot getPullRT(final String group, final String topic) { - return this.topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group); - } - - private StatsSnapshot getPullTPS(final String group, final String topic) { - return this.topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group); - } - - private StatsSnapshot getConsumeRT(final String group, final String topic) { - StatsSnapshot statsData = this.topicAndGroupConsumeRT.getStatsDataInMinute(topic + "@" + group); - if (0 == statsData.getSum()) { - statsData = this.topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group); - } - - return statsData; - } - - private StatsSnapshot getConsumeOKTPS(final String group, final String topic) { - return this.topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group); - } - - private StatsSnapshot getConsumeFailedTPS(final String group, final String topic) { - return this.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java new file mode 100644 index 0000000..8d15108 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client; + +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.remoting.common.RemotingUtil; + + +/** + * Client Common configuration + * + * @author shijia.wxr + * @author vongosling + */ +public class ClientConfig { + public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel"; + private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); + private String clientIP = RemotingUtil.getLocalAddress(); + private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT"); + private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors(); + /** + * Pulling topic information interval from the named server + */ + private int pollNameServerInteval = 1000 * 30; + /** + * Heartbeat interval in microseconds with message broker + */ + private int heartbeatBrokerInterval = 1000 * 30; + /** + * Offset persistent interval for consumer + */ + private int persistConsumerOffsetInterval = 1000 * 5; + private boolean unitMode = false; + private String unitName; + private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true")); + + public String buildMQClientId() { + StringBuilder sb = new StringBuilder(); + sb.append(this.getClientIP()); + + sb.append("@"); + sb.append(this.getInstanceName()); + if (!UtilAll.isBlank(this.unitName)) { + sb.append("@"); + sb.append(this.unitName); + } + + return sb.toString(); + } + + public String getClientIP() { + return clientIP; + } + + public void setClientIP(String clientIP) { + this.clientIP = clientIP; + } + + public String getInstanceName() { + return instanceName; + } + + public void setInstanceName(String instanceName) { + this.instanceName = instanceName; + } + + public void changeInstanceNameToPID() { + if (this.instanceName.equals("DEFAULT")) { + this.instanceName = String.valueOf(UtilAll.getPid()); + } + } + + public void resetClientConfig(final ClientConfig cc) { + this.namesrvAddr = cc.namesrvAddr; + this.clientIP = cc.clientIP; + this.instanceName = cc.instanceName; + this.clientCallbackExecutorThreads = cc.clientCallbackExecutorThreads; + this.pollNameServerInteval = cc.pollNameServerInteval; + this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval; + this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval; + this.unitMode = cc.unitMode; + this.unitName = cc.unitName; + this.vipChannelEnabled = cc.vipChannelEnabled; + } + + public ClientConfig cloneClientConfig() { + ClientConfig cc = new ClientConfig(); + cc.namesrvAddr = namesrvAddr; + cc.clientIP = clientIP; + cc.instanceName = instanceName; + cc.clientCallbackExecutorThreads = clientCallbackExecutorThreads; + cc.pollNameServerInteval = pollNameServerInteval; + cc.heartbeatBrokerInterval = heartbeatBrokerInterval; + cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval; + cc.unitMode = unitMode; + cc.unitName = unitName; + cc.vipChannelEnabled = vipChannelEnabled; + return cc; + } + + public String getNamesrvAddr() { + return namesrvAddr; + } + + public void setNamesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; + } + + public int getClientCallbackExecutorThreads() { + return clientCallbackExecutorThreads; + } + + + public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) { + this.clientCallbackExecutorThreads = clientCallbackExecutorThreads; + } + + + public int getPollNameServerInteval() { + return pollNameServerInteval; + } + + + public void setPollNameServerInteval(int pollNameServerInteval) { + this.pollNameServerInteval = pollNameServerInteval; + } + + + public int getHeartbeatBrokerInterval() { + return heartbeatBrokerInterval; + } + + + public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) { + this.heartbeatBrokerInterval = heartbeatBrokerInterval; + } + + + public int getPersistConsumerOffsetInterval() { + return persistConsumerOffsetInterval; + } + + + public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) { + this.persistConsumerOffsetInterval = persistConsumerOffsetInterval; + } + + + public String getUnitName() { + return unitName; + } + + + public void setUnitName(String unitName) { + this.unitName = unitName; + } + + + public boolean isUnitMode() { + return unitMode; + } + + + public void setUnitMode(boolean unitMode) { + this.unitMode = unitMode; + } + + + public boolean isVipChannelEnabled() { + return vipChannelEnabled; + } + + + public void setVipChannelEnabled(final boolean vipChannelEnabled) { + this.vipChannelEnabled = vipChannelEnabled; + } + + + @Override + public String toString() { + return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName + + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInteval=" + pollNameServerInteval + + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" + + vipChannelEnabled + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java new file mode 100644 index 0000000..9e85283 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; + + +/** + * Base interface for MQ management + * + * @author shijia.wxr + */ +public interface MQAdmin { + /** + * Creates an topic + * + * @param key + * accesskey + * @param newTopic + * topic name + * @param queueNum + * topic's queue number + * + * @throws MQClientException + */ + void createTopic(final String key, final String newTopic, final int queueNum) + throws MQClientException; + + + /** + * Creates an topic + * + * @param key + * accesskey + * @param newTopic + * topic name + * @param queueNum + * topic's queue number + * @param topicSysFlag + * topic system flag + * + * @throws MQClientException + */ + void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) + throws MQClientException; + + + /** + * Gets the message queue offset according to some time in milliseconds<br> + * be cautious to call because of more IO overhead + * + * @param mq + * Instance of MessageQueue + * @param timestamp + * from when in milliseconds. + * + * @return offset + * + * @throws MQClientException + */ + long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException; + + + /** + * Gets the max offset + * + * @param mq + * Instance of MessageQueue + * + * @return the max offset + * + * @throws MQClientException + */ + long maxOffset(final MessageQueue mq) throws MQClientException; + + + /** + * Gets the minimum offset + * + * @param mq + * Instance of MessageQueue + * + * @return the minimum offset + * + * @throws MQClientException + */ + long minOffset(final MessageQueue mq) throws MQClientException; + + + /** + * Gets the earliest stored message time + * + * @param mq + * Instance of MessageQueue + * + * @return the time in microseconds + * + * @throws MQClientException + */ + long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException; + + + /** + * Query message according tto message id + * + * @param offsetMsgId + * message id + * + * @return message + * + * @throws InterruptedException + * @throws MQBrokerException + * @throws RemotingException + * @throws MQClientException + */ + MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + + + /** + * Query messages + * + * @param topic + * message topic + * @param key + * message key index word + * @param maxNum + * max message number + * @param begin + * from when + * @param end + * to when + * + * @return Instance of QueryResult + * + * @throws MQClientException + * @throws InterruptedException + */ + QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, + final long end) throws MQClientException, InterruptedException; + + /** + + * @param topic + * @param msgId + * @return The {@code MessageExt} of given msgId + * @throws RemotingException + * @throws MQBrokerException + * @throws InterruptedException + * @throws MQClientException + */ + MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/MQHelper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/MQHelper.java b/client/src/main/java/org/apache/rocketmq/client/MQHelper.java new file mode 100644 index 0000000..41009c5 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/MQHelper.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.slf4j.Logger; + +import java.util.Set; +import java.util.TreeSet; + + +/** + * @author shijia.wxr + */ +public class MQHelper { + public static void resetOffsetByTimestamp( + final MessageModel messageModel, + final String consumerGroup, + final String topic, + final long timestamp) throws Exception { + resetOffsetByTimestamp(messageModel, "DEFAULT", consumerGroup, topic, timestamp); + } + + /** + * Reset consumer topic offset according to time + * + * @param messageModel + * which model + * @param instanceName + * which instance + * @param consumerGroup + * consumer group + * @param topic + * topic + * @param timestamp + * time + * + * @throws Exception + */ + public static void resetOffsetByTimestamp( + final MessageModel messageModel, + final String instanceName, + final String consumerGroup, + final String topic, + final long timestamp) throws Exception { + final Logger log = ClientLogger.getLog(); + + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup); + consumer.setInstanceName(instanceName); + consumer.setMessageModel(messageModel); + consumer.start(); + + Set<MessageQueue> mqs = null; + try { + mqs = consumer.fetchSubscribeMessageQueues(topic); + if (mqs != null && !mqs.isEmpty()) { + TreeSet<MessageQueue> mqsNew = new TreeSet<MessageQueue>(mqs); + for (MessageQueue mq : mqsNew) { + long offset = consumer.searchOffset(mq, timestamp); + if (offset >= 0) { + consumer.updateConsumeOffset(mq, offset); + log.info("resetOffsetByTimestamp updateConsumeOffset success, {} {} {}", + consumerGroup, offset, mq); + } + } + } + } catch (Exception e) { + log.warn("resetOffsetByTimestamp Exception", e); + throw e; + } finally { + if (mqs != null) { + consumer.getDefaultMQPullConsumerImpl().getOffsetStore().persistAll(mqs); + } + consumer.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/QueryResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/QueryResult.java b/client/src/main/java/org/apache/rocketmq/client/QueryResult.java new file mode 100644 index 0000000..cdbf1e7 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/QueryResult.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client; + +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class QueryResult { + private final long indexLastUpdateTimestamp; + private final List<MessageExt> messageList; + + + public QueryResult(long indexLastUpdateTimestamp, List<MessageExt> messageList) { + this.indexLastUpdateTimestamp = indexLastUpdateTimestamp; + this.messageList = messageList; + } + + + public long getIndexLastUpdateTimestamp() { + return indexLastUpdateTimestamp; + } + + + public List<MessageExt> getMessageList() { + return messageList; + } + + + @Override + public String toString() { + return "QueryResult [indexLastUpdateTimestamp=" + indexLastUpdateTimestamp + ", messageList=" + + messageList + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/Validators.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java new file mode 100644 index 0000000..e977d44 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.protocol.ResponseCode; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * Common Validator + * + * @author manhong.yqd + */ +public class Validators { + public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$"; + public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR); + public static final int CHARACTER_MAX_LENGTH = 255; + + /** + * @param origin + * @param patternStr + * + * @return The resulting {@code String} + */ + public static String getGroupWithRegularExpression(String origin, String patternStr) { + Pattern pattern = Pattern.compile(patternStr); + Matcher matcher = pattern.matcher(origin); + while (matcher.find()) { + return matcher.group(0); + } + return null; + } + + /** + * Validate group + * + * @param group + * + * @throws MQClientException + */ + public static void checkGroup(String group) throws MQClientException { + if (UtilAll.isBlank(group)) { + throw new MQClientException("the specified group is blank", null); + } + if (!regularExpressionMatcher(group, PATTERN)) { + throw new MQClientException(String.format( + "the specified group[%s] contains illegal characters, allowing only %s", group, + VALID_PATTERN_STR), null); + } + if (group.length() > CHARACTER_MAX_LENGTH) { + throw new MQClientException("the specified group is longer than group max length 255.", null); + } + } + + /** + * @param origin + * @param pattern + * + * @return <tt>true</tt> if, and only if, the entire origin sequence + * matches this matcher's pattern + */ + public static boolean regularExpressionMatcher(String origin, Pattern pattern) { + if (pattern == null) { + return true; + } + Matcher matcher = pattern.matcher(origin); + return matcher.matches(); + } + + /** + * Validate message + * + * @param msg + * @param defaultMQProducer + * + * @throws MQClientException + */ + public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) + throws MQClientException { + if (null == msg) { + throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); + } + // topic + Validators.checkTopic(msg.getTopic()); + // body + if (null == msg.getBody()) { + throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); + } + + if (0 == msg.getBody().length) { + throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); + } + + if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { + throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, + "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); + } + } + + /** + * Validate topic + * + * @param topic + * + * @throws MQClientException + */ + public static void checkTopic(String topic) throws MQClientException { + if (UtilAll.isBlank(topic)) { + throw new MQClientException("the specified topic is blank", null); + } + + if (!regularExpressionMatcher(topic, PATTERN)) { + throw new MQClientException(String.format( + "the specified topic[%s] contains illegal characters, allowing only %s", topic, + VALID_PATTERN_STR), null); + } + + if (topic.length() > CHARACTER_MAX_LENGTH) { + throw new MQClientException("the specified topic is longer than topic max length 255.", null); + } + + //whether the same with system reserved keyword + if (topic.equals(MixAll.DEFAULT_TOPIC)) { + throw new MQClientException( + String.format("the topic[%s] is conflict with default topic.", topic), null); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java b/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java new file mode 100644 index 0000000..cfff17e --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.admin; + +/** + * @author shijia.wxr + */ +public interface MQAdminExtInner { + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java new file mode 100644 index 0000000..6184379 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.common; + +public class ClientErrorCode { + public static final int CONNECT_BROKER_EXCEPTION = 10001; + public static final int ACCESS_BROKER_TIMEOUT = 10002; + public static final int BROKER_NOT_EXIST_EXCEPTION = 10003; + public static final int NO_NAME_SERVER_EXCEPTION = 10004; + public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java new file mode 100644 index 0000000..360cfdf --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.common; + +import java.util.Random; + +public class ThreadLocalIndex { + private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>(); + private final Random random = new Random(); + public ThreadLocalIndex(int value) { + + } + + public int getAndIncrement() { + Integer index = this.threadLocalIndex.get(); + if (null == index) { + index = Math.abs(random.nextInt()); + if (index < 0) index = 0; + this.threadLocalIndex.set(index); + } + + index = Math.abs(index + 1); + if (index < 0) + index = 0; + + this.threadLocalIndex.set(index); + return index; + } + + @Override + public String toString() { + return "ThreadLocalIndex{" + + "threadLocalIndex=" + threadLocalIndex.get() + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java b/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java new file mode 100644 index 0000000..cb98b62 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.List; + + +/** + * Strategy Algorithm for message allocating between consumers + * + * @author shijia.wxr + * @author vongosling + */ +public interface AllocateMessageQueueStrategy { + + /** + * Allocating by consumer id + * + * @param consumerGroup + * current consumer group + * @param currentCID + * current consumer id + * @param mqAll + * message queue set in current topic + * @param cidAll + * consumer set in current consumer group + * + * @return The allocate result of given strategy + */ + List<MessageQueue> allocate( + final String consumerGroup, + final String currentCID, + final List<MessageQueue> mqAll, + final List<String> cidAll + ); + + + /** + * Algorithm name + * + * @return The strategy name + */ + String getName(); +}
