http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java new file mode 100644 index 0000000..6f861d3 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java @@ -0,0 +1,380 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.producer; + +import com.alibaba.rocketmq.client.ClientConfig; +import com.alibaba.rocketmq.client.QueryResult; +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.message.*; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.exception.RemotingException; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class DefaultMQProducer extends ClientConfig implements MQProducer { + protected final transient DefaultMQProducerImpl defaultMQProducerImpl; + private String producerGroup; + /** + * Just for testing or demo program + */ + private String createTopicKey = MixAll.DEFAULT_TOPIC; + private volatile int defaultTopicQueueNums = 4; + private int sendMsgTimeout = 3000; + private int compressMsgBodyOverHowmuch = 1024 * 4; + private int retryTimesWhenSendFailed = 2; + private int retryTimesWhenSendAsyncFailed = 2; + + private boolean retryAnotherBrokerWhenNotStoreOK = false; + private int maxMessageSize = 1024 * 1024 * 4; // 4M + public DefaultMQProducer() { + this(MixAll.DEFAULT_PRODUCER_GROUP, null); + } + + + public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { + this.producerGroup = producerGroup; + defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); + } + + + public DefaultMQProducer(final String producerGroup) { + this(producerGroup, null); + } + + + public DefaultMQProducer(RPCHook rpcHook) { + this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook); + } + + + @Override + public void start() throws MQClientException { + this.defaultMQProducerImpl.start(); + } + + @Override + public void shutdown() { + this.defaultMQProducerImpl.shutdown(); + } + + + @Override + public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException { + return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic); + } + + + @Override + public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(msg); + } + + + @Override + public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(msg, timeout); + } + + + @Override + public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.send(msg, sendCallback); + } + + + @Override + public void send(Message msg, SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.send(msg, sendCallback, timeout); + } + + + @Override + public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.sendOneway(msg); + } + + + @Override + public SendResult send(Message msg, MessageQueue mq) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(msg, mq); + } + + + @Override + public SendResult send(Message msg, MessageQueue mq, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(msg, mq, timeout); + } + + + @Override + public void send(Message msg, MessageQueue mq, SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.send(msg, mq, sendCallback); + } + + + @Override + public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout); + } + + + @Override + public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.sendOneway(msg, mq); + } + + + @Override + public SendResult send(Message msg, MessageQueueSelector selector, Object arg) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(msg, selector, arg); + } + + + @Override + public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(msg, selector, arg, timeout); + } + + + @Override + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback); + } + + + @Override + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); + } + + + @Override + public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQProducerImpl.sendOneway(msg, selector, arg); + } + + + @Override + public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg) + throws MQClientException { + throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); + } + + + @Override + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + createTopic(key, newTopic, queueNum, 0); + } + + + @Override + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { + this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); + } + + + @Override + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return this.defaultMQProducerImpl.searchOffset(mq, timestamp); + } + + + @Override + public long maxOffset(MessageQueue mq) throws MQClientException { + return this.defaultMQProducerImpl.maxOffset(mq); + } + + + @Override + public long minOffset(MessageQueue mq) throws MQClientException { + return this.defaultMQProducerImpl.minOffset(mq); + } + + + @Override + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + return this.defaultMQProducerImpl.earliestMsgStoreTime(mq); + } + + + @Override + public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return this.defaultMQProducerImpl.viewMessage(offsetMsgId); + } + + + @Override + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end); + } + + + @Override + public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + try { + MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId); + return this.viewMessage(msgId); + } catch (Exception e) { + } + return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId); + } + + public String getProducerGroup() { + return producerGroup; + } + + + public void setProducerGroup(String producerGroup) { + this.producerGroup = producerGroup; + } + + + public String getCreateTopicKey() { + return createTopicKey; + } + + + public void setCreateTopicKey(String createTopicKey) { + this.createTopicKey = createTopicKey; + } + + + public int getSendMsgTimeout() { + return sendMsgTimeout; + } + + + public void setSendMsgTimeout(int sendMsgTimeout) { + this.sendMsgTimeout = sendMsgTimeout; + } + + + public int getCompressMsgBodyOverHowmuch() { + return compressMsgBodyOverHowmuch; + } + + + public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) { + this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch; + } + + + public DefaultMQProducerImpl getDefaultMQProducerImpl() { + return defaultMQProducerImpl; + } + + + public boolean isRetryAnotherBrokerWhenNotStoreOK() { + return retryAnotherBrokerWhenNotStoreOK; + } + + + public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) { + this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK; + } + + + public int getMaxMessageSize() { + return maxMessageSize; + } + + + public void setMaxMessageSize(int maxMessageSize) { + this.maxMessageSize = maxMessageSize; + } + + + public int getDefaultTopicQueueNums() { + return defaultTopicQueueNums; + } + + + public void setDefaultTopicQueueNums(int defaultTopicQueueNums) { + this.defaultTopicQueueNums = defaultTopicQueueNums; + } + + + public int getRetryTimesWhenSendFailed() { + return retryTimesWhenSendFailed; + } + + + public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) { + this.retryTimesWhenSendFailed = retryTimesWhenSendFailed; + } + + + public boolean isSendMessageWithVIPChannel() { + return isVipChannelEnabled(); + } + + + public void setSendMessageWithVIPChannel(final boolean sendMessageWithVIPChannel) { + this.setVipChannelEnabled(sendMessageWithVIPChannel); + } + + + public long[] getNotAvailableDuration() { + return this.defaultMQProducerImpl.getNotAvailableDuration(); + } + + public void setNotAvailableDuration(final long[] notAvailableDuration) { + this.defaultMQProducerImpl.setNotAvailableDuration(notAvailableDuration); + } + + public long[] getLatencyMax() { + return this.defaultMQProducerImpl.getLatencyMax(); + } + + public void setLatencyMax(final long[] latencyMax) { + this.defaultMQProducerImpl.setLatencyMax(latencyMax); + } + + public boolean isSendLatencyFaultEnable() { + return this.defaultMQProducerImpl.isSendLatencyFaultEnable(); + } + + public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { + this.defaultMQProducerImpl.setSendLatencyFaultEnable(sendLatencyFaultEnable); + } + + public int getRetryTimesWhenSendAsyncFailed() { + return retryTimesWhenSendAsyncFailed; + } + + public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) { + this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java new file mode 100644 index 0000000..af3723a --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.producer; + +import com.alibaba.rocketmq.common.message.Message; + + +/** + * @author shijia.wxr + */ +public interface LocalTransactionExecuter { + public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java new file mode 100644 index 0000000..ee2a93a --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.producer; + +/** + * @author shijia.wxr + */ +public enum LocalTransactionState { + COMMIT_MESSAGE, + ROLLBACK_MESSAGE, + UNKNOW, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java new file mode 100644 index 0000000..e21bc00 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.producer; + +import com.alibaba.rocketmq.client.MQAdmin; +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.remoting.exception.RemotingException; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public interface MQProducer extends MQAdmin { + void start() throws MQClientException; + + void shutdown(); + + + List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException; + + + SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; + + + SendResult send(final Message msg, final long timeout) throws MQClientException, + RemotingException, MQBrokerException, InterruptedException; + + + void send(final Message msg, final SendCallback sendCallback) throws MQClientException, + RemotingException, InterruptedException; + + + void send(final Message msg, final SendCallback sendCallback, final long timeout) + throws MQClientException, RemotingException, InterruptedException; + + + void sendOneway(final Message msg) throws MQClientException, RemotingException, + InterruptedException; + + + SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, + RemotingException, MQBrokerException, InterruptedException; + + + SendResult send(final Message msg, final MessageQueue mq, final long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException; + + + void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) + throws MQClientException, RemotingException, InterruptedException; + + + void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException; + + + void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, + RemotingException, InterruptedException; + + + SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException; + + + SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg, + final long timeout) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; + + + void send(final Message msg, final MessageQueueSelector selector, final Object arg, + final SendCallback sendCallback) throws MQClientException, RemotingException, + InterruptedException; + + + void send(final Message msg, final MessageQueueSelector selector, final Object arg, + final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, + InterruptedException; + + + void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) + throws MQClientException, RemotingException, InterruptedException; + + + TransactionSendResult sendMessageInTransaction(final Message msg, + final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java new file mode 100644 index 0000000..924c145 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.producer; + +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.message.MessageQueue; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public interface MessageQueueSelector { + MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java new file mode 100644 index 0000000..35d1a72 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.producer; + +/** + * @author shijia.wxr + */ +public interface SendCallback { + public void onSuccess(final SendResult sendResult); + + + public void onException(final Throwable e); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java new file mode 100644 index 0000000..183accf --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.producer; + +import com.alibaba.fastjson.JSON; +import com.alibaba.rocketmq.common.message.MessageQueue; + + +/** + * @author shijia.wxr + */ +public class SendResult { + private SendStatus sendStatus; + private String msgId; + private MessageQueue messageQueue; + private long queueOffset; + private String transactionId; + private String offsetMsgId; + private String regionId; + private boolean traceOn = true; + + public SendResult() { + } + + public SendResult(SendStatus sendStatus, String msgId, String offsetMsgId, MessageQueue messageQueue, long queueOffset) { + this.sendStatus = sendStatus; + this.msgId = msgId; + this.offsetMsgId = offsetMsgId; + this.messageQueue = messageQueue; + this.queueOffset = queueOffset; + } + + public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue, final long queueOffset, final String transactionId, final String offsetMsgId, final String regionId) { + this.sendStatus = sendStatus; + this.msgId = msgId; + this.messageQueue = messageQueue; + this.queueOffset = queueOffset; + this.transactionId = transactionId; + this.offsetMsgId = offsetMsgId; + this.regionId = regionId; + } + + public boolean isTraceOn() { + return traceOn; + } + + public void setTraceOn(final boolean traceOn) { + this.traceOn = traceOn; + } + + public String getRegionId() { + return regionId; + } + + public void setRegionId(final String regionId) { + this.regionId = regionId; + } + + public static String encoderSendResultToJson(final Object obj) { + return JSON.toJSONString(obj); + } + + public static SendResult decoderSendResultFromJson(String json) { + return JSON.parseObject(json, SendResult.class); + } + + public String getMsgId() { + return msgId; + } + + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + + public SendStatus getSendStatus() { + return sendStatus; + } + + + public void setSendStatus(SendStatus sendStatus) { + this.sendStatus = sendStatus; + } + + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + + public void setMessageQueue(MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + + public long getQueueOffset() { + return queueOffset; + } + + + public void setQueueOffset(long queueOffset) { + this.queueOffset = queueOffset; + } + + + public String getTransactionId() { + return transactionId; + } + + + public void setTransactionId(String transactionId) { + this.transactionId = transactionId; + } + + public String getOffsetMsgId() { + return offsetMsgId; + } + + public void setOffsetMsgId(String offsetMsgId) { + this.offsetMsgId = offsetMsgId; + } + + @Override + public String toString() { + return "SendResult [sendStatus=" + sendStatus + ", msgId=" + msgId + ", offsetMsgId=" + offsetMsgId + ", messageQueue=" + messageQueue + + ", queueOffset=" + queueOffset + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java new file mode 100644 index 0000000..3bc572f --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.producer; + +/** + * @author shijia.wxr + */ +public enum SendStatus { + SEND_OK, + FLUSH_DISK_TIMEOUT, + FLUSH_SLAVE_TIMEOUT, + SLAVE_NOT_AVAILABLE, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java new file mode 100644 index 0000000..8440537 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.producer; + +import com.alibaba.rocketmq.common.message.MessageExt; + + +/** + * @author shijia.wxr + */ +public interface TransactionCheckListener { + LocalTransactionState checkLocalTransactionState(final MessageExt msg); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java new file mode 100644 index 0000000..08dd4ab --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.producer; + +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.remoting.RPCHook; + + +/** + * @author shijia.wxr + */ +public class TransactionMQProducer extends DefaultMQProducer { + private TransactionCheckListener transactionCheckListener; + private int checkThreadPoolMinSize = 1; + private int checkThreadPoolMaxSize = 1; + private int checkRequestHoldMax = 2000; + + + public TransactionMQProducer() { + } + + + public TransactionMQProducer(final String producerGroup) { + super(producerGroup); + } + + public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) { + super(producerGroup, rpcHook); + } + + @Override + public void start() throws MQClientException { + this.defaultMQProducerImpl.initTransactionEnv(); + super.start(); + } + + + @Override + public void shutdown() { + super.shutdown(); + this.defaultMQProducerImpl.destroyTransactionEnv(); + } + + + @Override + public TransactionSendResult sendMessageInTransaction(final Message msg, + final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { + if (null == this.transactionCheckListener) { + throw new MQClientException("localTransactionBranchCheckListener is null", null); + } + + return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg); + } + + + public TransactionCheckListener getTransactionCheckListener() { + return transactionCheckListener; + } + + + public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) { + this.transactionCheckListener = transactionCheckListener; + } + + + public int getCheckThreadPoolMinSize() { + return checkThreadPoolMinSize; + } + + + public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) { + this.checkThreadPoolMinSize = checkThreadPoolMinSize; + } + + + public int getCheckThreadPoolMaxSize() { + return checkThreadPoolMaxSize; + } + + + public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) { + this.checkThreadPoolMaxSize = checkThreadPoolMaxSize; + } + + + public int getCheckRequestHoldMax() { + return checkRequestHoldMax; + } + + + public void setCheckRequestHoldMax(int checkRequestHoldMax) { + this.checkRequestHoldMax = checkRequestHoldMax; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java new file mode 100644 index 0000000..e7dcd0e --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.producer; + +/** + * @author shijia.wxr + */ +public class TransactionSendResult extends SendResult { + private LocalTransactionState localTransactionState; + + + public TransactionSendResult() { + } + + + public LocalTransactionState getLocalTransactionState() { + return localTransactionState; + } + + + public void setLocalTransactionState(LocalTransactionState localTransactionState) { + this.localTransactionState = localTransactionState; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java new file mode 100644 index 0000000..648356b --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.producer.selector; + +import com.alibaba.rocketmq.client.producer.MessageQueueSelector; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.message.MessageQueue; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class SelectMessageQueueByHash implements MessageQueueSelector { + + @Override + public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { + int value = arg.hashCode(); + if (value < 0) { + value = Math.abs(value); + } + + value = value % mqs.size(); + return mqs.get(value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java new file mode 100644 index 0000000..a213391 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.producer.selector; + +import com.alibaba.rocketmq.client.producer.MessageQueueSelector; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.message.MessageQueue; + +import java.util.List; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class SelectMessageQueueByMachineRoom implements MessageQueueSelector { + private Set<String> consumeridcs; + + + @Override + public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { + return null; + } + + + public Set<String> getConsumeridcs() { + return consumeridcs; + } + + + public void setConsumeridcs(Set<String> consumeridcs) { + this.consumeridcs = consumeridcs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java new file mode 100644 index 0000000..3f381e4 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.producer.selector; + +import com.alibaba.rocketmq.client.producer.MessageQueueSelector; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.message.MessageQueue; + +import java.util.List; +import java.util.Random; + + +/** + * @author shijia.wxr + */ +public class SelectMessageQueueByRandoom implements MessageQueueSelector { + private Random random = new Random(System.currentTimeMillis()); + + + @Override + public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { + int value = random.nextInt(); + if (value < 0) { + value = Math.abs(value); + } + + value = value % mqs.size(); + return mqs.get(value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java new file mode 100644 index 0000000..e07e233 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.client.stat; + +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus; +import com.alibaba.rocketmq.common.stats.StatsItemSet; +import com.alibaba.rocketmq.common.stats.StatsSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledExecutorService; + + +public class ConsumerStatsManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME); + + private static final String TOPIC_AND_GROUP_CONSUME_OK_TPS = "CONSUME_OK_TPS"; + private static final String TOPIC_AND_GROUP_CONSUME_FAILED_TPS = "CONSUME_FAILED_TPS"; + private static final String TOPIC_AND_GROUP_CONSUME_RT = "CONSUME_RT"; + private static final String TOPIC_AND_GROUP_PULL_TPS = "PULL_TPS"; + private static final String TOPIC_AND_GROUP_PULL_RT = "PULL_RT"; + + private final StatsItemSet topicAndGroupConsumeOKTPS; + private final StatsItemSet topicAndGroupConsumeRT; + private final StatsItemSet topicAndGroupConsumeFailedTPS; + private final StatsItemSet topicAndGroupPullTPS; + private final StatsItemSet topicAndGroupPullRT; + + + public ConsumerStatsManager(final ScheduledExecutorService scheduledExecutorService) { + this.topicAndGroupConsumeOKTPS = + new StatsItemSet(TOPIC_AND_GROUP_CONSUME_OK_TPS, scheduledExecutorService, log); + + this.topicAndGroupConsumeRT = + new StatsItemSet(TOPIC_AND_GROUP_CONSUME_RT, scheduledExecutorService, log); + + this.topicAndGroupConsumeFailedTPS = + new StatsItemSet(TOPIC_AND_GROUP_CONSUME_FAILED_TPS, scheduledExecutorService, log); + + this.topicAndGroupPullTPS = new StatsItemSet(TOPIC_AND_GROUP_PULL_TPS, scheduledExecutorService, log); + + this.topicAndGroupPullRT = new StatsItemSet(TOPIC_AND_GROUP_PULL_RT, scheduledExecutorService, log); + } + + + public void start() { + } + + + public void shutdown() { + } + + + public void incPullRT(final String group, final String topic, final long rt) { + this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1); + } + + + public void incPullTPS(final String group, final String topic, final long msgs) { + this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int) msgs, 1); + } + + + public void incConsumeRT(final String group, final String topic, final long rt) { + this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1); + } + + + public void incConsumeOKTPS(final String group, final String topic, final long msgs) { + this.topicAndGroupConsumeOKTPS.addValue(topic + "@" + group, (int) msgs, 1); + } + + + public void incConsumeFailedTPS(final String group, final String topic, final long msgs) { + this.topicAndGroupConsumeFailedTPS.addValue(topic + "@" + group, (int) msgs, 1); + } + + public ConsumeStatus consumeStatus(final String group, final String topic) { + ConsumeStatus cs = new ConsumeStatus(); + { + StatsSnapshot ss = this.getPullRT(group, topic); + if (ss != null) { + cs.setPullRT(ss.getAvgpt()); + } + } + + { + StatsSnapshot ss = this.getPullTPS(group, topic); + if (ss != null) { + cs.setPullTPS(ss.getTps()); + } + } + + { + StatsSnapshot ss = this.getConsumeRT(group, topic); + if (ss != null) { + cs.setConsumeRT(ss.getAvgpt()); + } + } + + { + StatsSnapshot ss = this.getConsumeOKTPS(group, topic); + if (ss != null) { + cs.setConsumeOKTPS(ss.getTps()); + } + } + + { + StatsSnapshot ss = this.getConsumeFailedTPS(group, topic); + if (ss != null) { + cs.setConsumeFailedTPS(ss.getTps()); + } + } + + { + StatsSnapshot ss = this.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group); + if (ss != null) { + cs.setConsumeFailedMsgs(ss.getSum()); + } + } + + return cs; + } + + private StatsSnapshot getPullRT(final String group, final String topic) { + return this.topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group); + } + + private StatsSnapshot getPullTPS(final String group, final String topic) { + return this.topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group); + } + + private StatsSnapshot getConsumeRT(final String group, final String topic) { + StatsSnapshot statsData = this.topicAndGroupConsumeRT.getStatsDataInMinute(topic + "@" + group); + if (0 == statsData.getSum()) { + statsData = this.topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group); + } + + return statsData; + } + + private StatsSnapshot getConsumeOKTPS(final String group, final String topic) { + return this.topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group); + } + + private StatsSnapshot getConsumeFailedTPS(final String group, final String topic) { + return this.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/resources/log4j_rocketmq_client.xml ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/resources/log4j_rocketmq_client.xml b/rocketmq-client/src/main/resources/log4j_rocketmq_client.xml new file mode 100644 index 0000000..bf4b885 --- /dev/null +++ b/rocketmq-client/src/main/resources/log4j_rocketmq_client.xml @@ -0,0 +1,58 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain producerGroup copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + + +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="STDOUT-APPENDER" class="org.apache.log4j.ConsoleAppender"> + <param name="encoding" value="UTF-8"/> + <param name="target" value="System.out"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%-5p %defaultTopic{2} , %m%n"/> + </layout> + </appender> + + <appender name="RocketmqClientAppender" class="org.apache.log4j.RollingFileAppender"> + <param name="file" value="${client.logRoot}/rocketmq_client.log"/> + <param name="append" value="true"/> + <param name="encoding" value="UTF-8"/> + <param name="maxFileSize" value="1073741824"/> + <param name="maxBackupIndex" value="${client.logFileMaxIndex}"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%defaultTopicQueueNums{yyy-MM-dd HH\:mm\:ss,SSS} %p %defaultTopic{1}(%L) - %m%n"/> + </layout> + </appender> + + <logger name="RocketmqClient" additivity="false"> + <level value="${client.logLevel}"/> + <appender-ref ref="RocketmqClientAppender"/> + </logger> + + <logger name="RocketmqCommon" additivity="false"> + <level value="${client.logLevel}"/> + <appender-ref ref="RocketmqClientAppender"/> + </logger> + + <logger name="RocketmqRemoting" additivity="false"> + <level value="${client.logLevel}"/> + <appender-ref ref="RocketmqClientAppender"/> + </logger> + +</log4j:configuration> + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/resources/logback_rocketmq_client.xml ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/resources/logback_rocketmq_client.xml b/rocketmq-client/src/main/resources/logback_rocketmq_client.xml new file mode 100644 index 0000000..a845ee4 --- /dev/null +++ b/rocketmq-client/src/main/resources/logback_rocketmq_client.xml @@ -0,0 +1,58 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain producerGroup copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<configuration> + <appender name="RocketmqClientAppender" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>${client.logRoot}/rocketmq_client.log</file> + <append>true</append> + <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> + <fileNamePattern>${client.logRoot}/otherdays/rocketmq_client.%properties.log + </fileNamePattern> + <minIndex>1</minIndex> + <maxIndex>${client.logFileMaxIndex}</maxIndex> + </rollingPolicy> + <triggeringPolicy + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + <maxFileSize>100MB</maxFileSize> + </triggeringPolicy> + <encoder> + <pattern>%defaultTopicQueueNums{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern> + <charset class="java.nio.charset.Charset">UTF-8</charset> + </encoder> + </appender> + + <logger name="RocketmqCommon" additivity="false"> + <level value="${client.logLevel}"/> + <appender-ref ref="RocketmqClientAppender"/> + </logger> + + + <logger name="RocketmqRemoting" additivity="false"> + <level value="${client.logLevel}"/> + <appender-ref ref="RocketmqClientAppender"/> + </logger> + + + <logger name="RocketmqClient" additivity="false"> + <level value="${client.logLevel}"/> + <appender-ref ref="RocketmqClientAppender"/> + </logger> + + +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java new file mode 100644 index 0000000..6dadafb --- /dev/null +++ b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.client; + +import org.junit.Assert; +import org.junit.Test; + + +public class ValidatorsTest { + + @Test + public void topicValidatorTest() { + try { + Validators.checkTopic("Hello"); + Validators.checkTopic("%RETRY%Hello"); + Validators.checkTopic("_%RETRY%Hello"); + Validators.checkTopic("-%RETRY%Hello"); + Validators.checkTopic("223-%RETRY%Hello"); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(false); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java new file mode 100644 index 0000000..5ef75ed --- /dev/null +++ b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * @author [email protected] + * @version $id$ + */ +package com.alibaba.rocketmq.client.consumer.loadbalance; + +import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle; +import com.alibaba.rocketmq.common.message.MessageQueue; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + + +/** + * @author [email protected] created on 2013-07-03 16:24 + */ +public class AllocateMessageQueueAveragelyTest { + private AllocateMessageQueueStrategy allocateMessageQueueAveragely; + private String currentCID; + private String topic; + private List<MessageQueue> messageQueueList; + private List<String> consumerIdList; + + @Before + public void init() { + allocateMessageQueueAveragely = new AllocateMessageQueueAveragely(); + topic = "topic_test"; + } + + @Test + public void testConsumer1() { + currentCID = "0"; + createConsumerIdList(1); + createMessageQueueList(5); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer1"); + Assert.assertEquals(result.size(), 5); + Assert.assertEquals(result.containsAll(getMessageQueueList()), true); + } + + public void createConsumerIdList(int size) { + consumerIdList = new ArrayList<String>(size); + for (int i = 0; i < size; i++) { + consumerIdList.add(String.valueOf(i)); + } + } + + public void createMessageQueueList(int size) { + messageQueueList = new ArrayList<MessageQueue>(size); + for (int i = 0; i < size; i++) { + MessageQueue mq = new MessageQueue(topic, "brokerName", i); + messageQueueList.add(mq); + } + } + + public void printMessageQueue(List<MessageQueue> messageQueueList, String name) { + if (messageQueueList == null || messageQueueList.size() < 1) + return; + System.out.println(name + ".......................................start"); + for (MessageQueue messageQueue : messageQueueList) { + System.out.println(messageQueue); + } + System.out.println(name + ".......................................end"); + } + + public List<MessageQueue> getMessageQueueList() { + return messageQueueList; + } + + public void setMessageQueueList(List<MessageQueue> messageQueueList) { + this.messageQueueList = messageQueueList; + } + + @Test + public void testConsumer2() { + currentCID = "1"; + createConsumerIdList(2); + createMessageQueueList(5); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer2"); + Assert.assertEquals(result.size(), 3); + Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true); + + } + + @Test + public void testConsumer3CurrentCID0() { + currentCID = "0"; + createConsumerIdList(3); + createMessageQueueList(5); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer3CurrentCID0"); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.containsAll(getMessageQueueList().subList(0, 1)), true); + } + + @Test + public void testConsumer3CurrentCID1() { + currentCID = "1"; + createConsumerIdList(3); + createMessageQueueList(5); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer3CurrentCID1"); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true); + } + + @Test + public void testConsumer3CurrentCID2() { + currentCID = "2"; + createConsumerIdList(3); + createMessageQueueList(5); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer3CurrentCID2"); + Assert.assertEquals(result.size(), 3); + Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true); + } + + @Test + public void testConsumer4() { + currentCID = "1"; + createConsumerIdList(4); + createMessageQueueList(5); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer4"); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true); + } + + @Test + public void testConsumer5() { + currentCID = "1"; + createConsumerIdList(5); + createMessageQueueList(5); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer5"); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true); + } + + @Test + public void testConsumer6() { + currentCID = "1"; + createConsumerIdList(2); + createMessageQueueList(6); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testConsumer"); + Assert.assertEquals(result.size(), 3); + Assert.assertEquals(result.containsAll(getMessageQueueList().subList(3, 6)), true); + } + + @Test + public void testCurrentCIDNotExists() { + currentCID = String.valueOf(Integer.MAX_VALUE); + createConsumerIdList(2); + createMessageQueueList(6); + List<MessageQueue> result = + allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); + printMessageQueue(result, "testCurrentCIDNotExists"); + Assert.assertEquals(result.size(), 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testCurrentCIDIllegalArgument() { + createConsumerIdList(2); + createMessageQueueList(6); + allocateMessageQueueAveragely.allocate("", "", getMessageQueueList(), getConsumerIdList()); + } + + public List<String> getConsumerIdList() { + return consumerIdList; + } + + public void setConsumerIdList(List<String> consumerIdList) { + this.consumerIdList = consumerIdList; + } + + @Test(expected = IllegalArgumentException.class) + public void testMessageQueueIllegalArgument() { + currentCID = "0"; + createConsumerIdList(2); + allocateMessageQueueAveragely.allocate("", currentCID, null, getConsumerIdList()); + } + + @Test(expected = IllegalArgumentException.class) + public void testConsumerIdIllegalArgument() { + currentCID = "0"; + createMessageQueueList(6); + allocateMessageQueueAveragely.allocate("", currentCID, getMessageQueueList(), null); + } + + @Test + public void testAllocate() { + AllocateMessageQueueAveragely allocateMessageQueueAveragely = new AllocateMessageQueueAveragely(); + String topic = "topic_test"; + String currentCID = "CID"; + int queueSize = 19; + int consumerSize = 10; + List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); + for (int i = 0; i < queueSize; i++) { + MessageQueue mq = new MessageQueue(topic, "brokerName", i); + mqAll.add(mq); + } + + List<String> cidAll = new ArrayList<String>(); + for (int j = 0; j < consumerSize; j++) { + cidAll.add("CID" + j); + } + System.out.println(mqAll.toString()); + System.out.println(cidAll.toString()); + for (int i = 0; i < consumerSize; i++) { + List<MessageQueue> rs = allocateMessageQueueAveragely.allocate("", currentCID + i, mqAll, cidAll); + System.out.println("rs[" + currentCID + i + "]:" + rs.toString()); + } + } + + + @Test + public void testAllocateByCircle() { + AllocateMessageQueueAveragelyByCircle circle = new AllocateMessageQueueAveragelyByCircle(); + String topic = "topic_test"; + String currentCID = "CID"; + int consumerSize = 3; + int queueSize = 13; + List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); + for (int i = 0; i < queueSize; i++) { + MessageQueue mq = new MessageQueue(topic, "brokerName", i); + mqAll.add(mq); + } + + List<String> cidAll = new ArrayList<String>(); + for (int j = 0; j < consumerSize; j++) { + cidAll.add("CID" + j); + } + System.out.println(mqAll.toString()); + System.out.println(cidAll.toString()); + for (int i = 0; i < consumerSize; i++) { + List<MessageQueue> rs = circle.allocate("", currentCID + i, mqAll, cidAll); + System.out.println("rs[" + currentCID + i + "]:" + rs.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/pom.xml ---------------------------------------------------------------------- diff --git a/rocketmq-common/pom.xml b/rocketmq-common/pom.xml new file mode 100644 index 0000000..72cc2b0 --- /dev/null +++ b/rocketmq-common/pom.xml @@ -0,0 +1,43 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain producerGroup copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>com.alibaba.rocketmq</groupId> + <artifactId>rocketmq-all</artifactId> + <version>4.0.0-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <packaging>jar</packaging> + <artifactId>rocketmq-common</artifactId> + <name>rocketmq-common ${project.version}</name> + + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-remoting</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java new file mode 100644 index 0000000..6eae0a7 --- /dev/null +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java @@ -0,0 +1,540 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.common; + +import com.alibaba.rocketmq.common.annotation.ImportantField; +import com.alibaba.rocketmq.common.constant.PermName; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; + +import java.net.InetAddress; +import java.net.UnknownHostException; + + +/** + * @author shijia.wxr + */ +public class BrokerConfig { + private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + @ImportantField + private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); + @ImportantField + private String brokerIP1 = RemotingUtil.getLocalAddress(); + private String brokerIP2 = RemotingUtil.getLocalAddress(); + @ImportantField + private String brokerName = localHostName(); + @ImportantField + private String brokerClusterName = "DefaultCluster"; + @ImportantField + private long brokerId = MixAll.MASTER_ID; + private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE; + private int defaultTopicQueueNums = 8; + @ImportantField + private boolean autoCreateTopicEnable = true; + + private boolean clusterTopicEnable = true; + + private boolean brokerTopicEnable = true; + @ImportantField + private boolean autoCreateSubscriptionGroup = true; + private String messageStorePlugIn = ""; + + private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4; + private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; + private int adminBrokerThreadPoolNums = 16; + private int clientManageThreadPoolNums = 32; + private int consumerManageThreadPoolNums = 32; + + private int flushConsumerOffsetInterval = 1000 * 5; + + private int flushConsumerOffsetHistoryInterval = 1000 * 60; + + @ImportantField + private boolean rejectTransactionMessage = false; + @ImportantField + private boolean fetchNamesrvAddrByAddressServer = false; + private int sendThreadPoolQueueCapacity = 10000; + private int pullThreadPoolQueueCapacity = 100000; + private int clientManagerThreadPoolQueueCapacity = 1000000; + private int consumerManagerThreadPoolQueueCapacity = 1000000; + + private int filterServerNums = 0; + + private boolean longPollingEnable = true; + + private long shortPollingTimeMills = 1000; + + private boolean notifyConsumerIdsChangedEnable = true; + + private boolean highSpeedMode = false; + + private boolean commercialEnable = true; + private int commercialTimerCount = 1; + private int commercialTransCount = 1; + private int commercialBigCount = 1; + + private boolean transferMsgByHeap = true; + private int maxDelayTime = 40; + + + private String regionId = MixAll.DEFAULT_TRACE_REGION_ID; + private int registerBrokerTimeoutMills = 6000; + + private boolean slaveReadEnable = false; + + private boolean disableConsumeIfConsumerReadSlowly = false; + private long consumerFallbehindThreshold = 1024 * 1024 * 1024 * 16; + + private long waitTimeMillsInSendQueue = 200; + + private long startAcceptSendRequestTimeStamp = 0L; + + private boolean traceOn = true; + + public boolean isTraceOn() { + return traceOn; + } + + public void setTraceOn(final boolean traceOn) { + this.traceOn = traceOn; + } + + public long getStartAcceptSendRequestTimeStamp() { + return startAcceptSendRequestTimeStamp; + } + + public void setStartAcceptSendRequestTimeStamp(final long startAcceptSendRequestTimeStamp) { + this.startAcceptSendRequestTimeStamp = startAcceptSendRequestTimeStamp; + } + + public long getWaitTimeMillsInSendQueue() { + return waitTimeMillsInSendQueue; + } + + public void setWaitTimeMillsInSendQueue(final long waitTimeMillsInSendQueue) { + this.waitTimeMillsInSendQueue = waitTimeMillsInSendQueue; + } + + public long getConsumerFallbehindThreshold() { + return consumerFallbehindThreshold; + } + + public void setConsumerFallbehindThreshold(final long consumerFallbehindThreshold) { + this.consumerFallbehindThreshold = consumerFallbehindThreshold; + } + + public boolean isDisableConsumeIfConsumerReadSlowly() { + return disableConsumeIfConsumerReadSlowly; + } + + public void setDisableConsumeIfConsumerReadSlowly(final boolean disableConsumeIfConsumerReadSlowly) { + this.disableConsumeIfConsumerReadSlowly = disableConsumeIfConsumerReadSlowly; + } + + public boolean isSlaveReadEnable() { + return slaveReadEnable; + } + + public void setSlaveReadEnable(final boolean slaveReadEnable) { + this.slaveReadEnable = slaveReadEnable; + } + + public static String localHostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + + return "DEFAULT_BROKER"; + } + + public int getRegisterBrokerTimeoutMills() { + return registerBrokerTimeoutMills; + } + + public void setRegisterBrokerTimeoutMills(final int registerBrokerTimeoutMills) { + this.registerBrokerTimeoutMills = registerBrokerTimeoutMills; + } + + public String getRegionId() { + return regionId; + } + + public void setRegionId(final String regionId) { + this.regionId = regionId; + } + + public boolean isTransferMsgByHeap() { + return transferMsgByHeap; + } + + public void setTransferMsgByHeap(final boolean transferMsgByHeap) { + this.transferMsgByHeap = transferMsgByHeap; + } + + public String getMessageStorePlugIn() { + return messageStorePlugIn; + } + + public void setMessageStorePlugIn(String messageStorePlugIn) { + this.messageStorePlugIn = messageStorePlugIn; + } + + public boolean isHighSpeedMode() { + return highSpeedMode; + } + + + public void setHighSpeedMode(final boolean highSpeedMode) { + this.highSpeedMode = highSpeedMode; + } + + + public String getRocketmqHome() { + return rocketmqHome; + } + + + public void setRocketmqHome(String rocketmqHome) { + this.rocketmqHome = rocketmqHome; + } + + + public String getBrokerName() { + return brokerName; + } + + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + + public int getBrokerPermission() { + return brokerPermission; + } + + + public void setBrokerPermission(int brokerPermission) { + this.brokerPermission = brokerPermission; + } + + + public int getDefaultTopicQueueNums() { + return defaultTopicQueueNums; + } + + + public void setDefaultTopicQueueNums(int defaultTopicQueueNums) { + this.defaultTopicQueueNums = defaultTopicQueueNums; + } + + + public boolean isAutoCreateTopicEnable() { + return autoCreateTopicEnable; + } + + + public void setAutoCreateTopicEnable(boolean autoCreateTopic) { + this.autoCreateTopicEnable = autoCreateTopic; + } + + + public String getBrokerClusterName() { + return brokerClusterName; + } + + + public void setBrokerClusterName(String brokerClusterName) { + this.brokerClusterName = brokerClusterName; + } + + + public String getBrokerIP1() { + return brokerIP1; + } + + + public void setBrokerIP1(String brokerIP1) { + this.brokerIP1 = brokerIP1; + } + + + public String getBrokerIP2() { + return brokerIP2; + } + + + public void setBrokerIP2(String brokerIP2) { + this.brokerIP2 = brokerIP2; + } + + public int getSendMessageThreadPoolNums() { + return sendMessageThreadPoolNums; + } + + public void setSendMessageThreadPoolNums(int sendMessageThreadPoolNums) { + this.sendMessageThreadPoolNums = sendMessageThreadPoolNums; + } + + + public int getPullMessageThreadPoolNums() { + return pullMessageThreadPoolNums; + } + + + public void setPullMessageThreadPoolNums(int pullMessageThreadPoolNums) { + this.pullMessageThreadPoolNums = pullMessageThreadPoolNums; + } + + + public int getAdminBrokerThreadPoolNums() { + return adminBrokerThreadPoolNums; + } + + + public void setAdminBrokerThreadPoolNums(int adminBrokerThreadPoolNums) { + this.adminBrokerThreadPoolNums = adminBrokerThreadPoolNums; + } + + + public int getFlushConsumerOffsetInterval() { + return flushConsumerOffsetInterval; + } + + + public void setFlushConsumerOffsetInterval(int flushConsumerOffsetInterval) { + this.flushConsumerOffsetInterval = flushConsumerOffsetInterval; + } + + + public int getFlushConsumerOffsetHistoryInterval() { + return flushConsumerOffsetHistoryInterval; + } + + + public void setFlushConsumerOffsetHistoryInterval(int flushConsumerOffsetHistoryInterval) { + this.flushConsumerOffsetHistoryInterval = flushConsumerOffsetHistoryInterval; + } + + + public boolean isClusterTopicEnable() { + return clusterTopicEnable; + } + + + public void setClusterTopicEnable(boolean clusterTopicEnable) { + this.clusterTopicEnable = clusterTopicEnable; + } + + + public String getNamesrvAddr() { + return namesrvAddr; + } + + + public void setNamesrvAddr(String namesrvAddr) { + this.namesrvAddr = namesrvAddr; + } + + + public long getBrokerId() { + return brokerId; + } + + + public void setBrokerId(long brokerId) { + this.brokerId = brokerId; + } + + + public boolean isAutoCreateSubscriptionGroup() { + return autoCreateSubscriptionGroup; + } + + + public void setAutoCreateSubscriptionGroup(boolean autoCreateSubscriptionGroup) { + this.autoCreateSubscriptionGroup = autoCreateSubscriptionGroup; + } + + + public boolean isRejectTransactionMessage() { + return rejectTransactionMessage; + } + + + public void setRejectTransactionMessage(boolean rejectTransactionMessage) { + this.rejectTransactionMessage = rejectTransactionMessage; + } + + + public boolean isFetchNamesrvAddrByAddressServer() { + return fetchNamesrvAddrByAddressServer; + } + + + public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) { + this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer; + } + + + public int getSendThreadPoolQueueCapacity() { + return sendThreadPoolQueueCapacity; + } + + + public void setSendThreadPoolQueueCapacity(int sendThreadPoolQueueCapacity) { + this.sendThreadPoolQueueCapacity = sendThreadPoolQueueCapacity; + } + + + public int getPullThreadPoolQueueCapacity() { + return pullThreadPoolQueueCapacity; + } + + + public void setPullThreadPoolQueueCapacity(int pullThreadPoolQueueCapacity) { + this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity; + } + + + public boolean isBrokerTopicEnable() { + return brokerTopicEnable; + } + + + public void setBrokerTopicEnable(boolean brokerTopicEnable) { + this.brokerTopicEnable = brokerTopicEnable; + } + + + public int getFilterServerNums() { + return filterServerNums; + } + + + public void setFilterServerNums(int filterServerNums) { + this.filterServerNums = filterServerNums; + } + + + public boolean isLongPollingEnable() { + return longPollingEnable; + } + + + public void setLongPollingEnable(boolean longPollingEnable) { + this.longPollingEnable = longPollingEnable; + } + + + public boolean isNotifyConsumerIdsChangedEnable() { + return notifyConsumerIdsChangedEnable; + } + + + public void setNotifyConsumerIdsChangedEnable(boolean notifyConsumerIdsChangedEnable) { + this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable; + } + + + public long getShortPollingTimeMills() { + return shortPollingTimeMills; + } + + + public void setShortPollingTimeMills(long shortPollingTimeMills) { + this.shortPollingTimeMills = shortPollingTimeMills; + } + + + public int getClientManageThreadPoolNums() { + return clientManageThreadPoolNums; + } + + + public void setClientManageThreadPoolNums(int clientManageThreadPoolNums) { + this.clientManageThreadPoolNums = clientManageThreadPoolNums; + } + + + public boolean isCommercialEnable() { + return commercialEnable; + } + + + public void setCommercialEnable(final boolean commercialEnable) { + this.commercialEnable = commercialEnable; + } + + public int getCommercialTimerCount() { + return commercialTimerCount; + } + + public void setCommercialTimerCount(final int commercialTimerCount) { + this.commercialTimerCount = commercialTimerCount; + } + + public int getCommercialTransCount() { + return commercialTransCount; + } + + public void setCommercialTransCount(final int commercialTransCount) { + this.commercialTransCount = commercialTransCount; + } + + public int getCommercialBigCount() { + return commercialBigCount; + } + + public void setCommercialBigCount(final int commercialBigCount) { + this.commercialBigCount = commercialBigCount; + } + + public int getMaxDelayTime() { + return maxDelayTime; + } + + + public void setMaxDelayTime(final int maxDelayTime) { + this.maxDelayTime = maxDelayTime; + } + + public int getClientManagerThreadPoolQueueCapacity() { + return clientManagerThreadPoolQueueCapacity; + } + + public void setClientManagerThreadPoolQueueCapacity(int clientManagerThreadPoolQueueCapacity) { + this.clientManagerThreadPoolQueueCapacity = clientManagerThreadPoolQueueCapacity; + } + + public int getConsumerManagerThreadPoolQueueCapacity() { + return consumerManagerThreadPoolQueueCapacity; + } + + public void setConsumerManagerThreadPoolQueueCapacity(int consumerManagerThreadPoolQueueCapacity) { + this.consumerManagerThreadPoolQueueCapacity = consumerManagerThreadPoolQueueCapacity; + } + + public int getConsumerManageThreadPoolNums() { + return consumerManageThreadPoolNums; + } + + public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) { + this.consumerManageThreadPoolNums = consumerManageThreadPoolNums; + } +}
