http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java new file mode 100644 index 0000000..3784752 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.rebalance; + +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy { + private List<MessageQueue> messageQueueList; + + @Override + public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, + List<String> cidAll) { + return this.messageQueueList; + } + + @Override + public String getName() { + return "CONFIG"; + } + + public List<MessageQueue> getMessageQueueList() { + return messageQueueList; + } + + + public void setMessageQueueList(List<MessageQueue> messageQueueList) { + this.messageQueueList = messageQueueList; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java new file mode 100644 index 0000000..d3448c9 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.rebalance; + +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + + +/** + * Computer room Hashing queue algorithm, such as Alipay logic room + */ +public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy { + private Set<String> consumeridcs; + + @Override + public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, + List<String> cidAll) { + List<MessageQueue> result = new ArrayList<MessageQueue>(); + int currentIndex = cidAll.indexOf(currentCID); + if (currentIndex < 0) { + return result; + } + List<MessageQueue> premqAll = new ArrayList<MessageQueue>(); + for (MessageQueue mq : mqAll) { + String[] temp = mq.getBrokerName().split("@"); + if (temp.length == 2 && consumeridcs.contains(temp[0])) { + premqAll.add(mq); + } + } + // Todo cid + int mod = premqAll.size() / cidAll.size(); + int rem = premqAll.size() % cidAll.size(); + int startindex = mod * currentIndex; + int endindex = startindex + mod; + for (int i = startindex; i < endindex; i++) { + result.add(mqAll.get(i)); + } + if (rem > currentIndex) { + result.add(premqAll.get(currentIndex + mod * cidAll.size())); + } + return result; + } + + @Override + public String getName() { + return "MACHINE_ROOM"; + } + + public Set<String> getConsumeridcs() { + return consumeridcs; + } + + + public void setConsumeridcs(Set<String> consumeridcs) { + this.consumeridcs = consumeridcs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java new file mode 100644 index 0000000..f4d87e7 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.store; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * Local storage implementation + * + * @author shijia.wxr + */ +public class LocalFileOffsetStore implements OffsetStore { + public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty( + "rocketmq.client.localOffsetStoreDir", + System.getProperty("user.home") + File.separator + ".rocketmq_offsets"); + private final static Logger log = ClientLogger.getLog(); + private final MQClientInstance mQClientFactory; + private final String groupName; + private final String storePath; + private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = + new ConcurrentHashMap<MessageQueue, AtomicLong>(); + + + public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) { + this.mQClientFactory = mQClientFactory; + this.groupName = groupName; + this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + // + this.mQClientFactory.getClientId() + File.separator + // + this.groupName + File.separator + // + "offsets.json"; + } + + + @Override + public void load() throws MQClientException { + OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset(); + if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) { + offsetTable.putAll(offsetSerializeWrapper.getOffsetTable()); + + for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) { + AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); + log.info("load consumer's offset, {} {} {}", + this.groupName, + mq, + offset.get()); + } + } + } + + + @Override + public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { + if (mq != null) { + AtomicLong offsetOld = this.offsetTable.get(mq); + if (null == offsetOld) { + offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); + } + + if (null != offsetOld) { + if (increaseOnly) { + MixAll.compareAndIncreaseOnly(offsetOld, offset); + } else { + offsetOld.set(offset); + } + } + } + } + + + @Override + public long readOffset(final MessageQueue mq, final ReadOffsetType type) { + if (mq != null) { + switch (type) { + case MEMORY_FIRST_THEN_STORE: + case READ_FROM_MEMORY: { + AtomicLong offset = this.offsetTable.get(mq); + if (offset != null) { + return offset.get(); + } else if (ReadOffsetType.READ_FROM_MEMORY == type) { + return -1; + } + } + case READ_FROM_STORE: { + OffsetSerializeWrapper offsetSerializeWrapper; + try { + offsetSerializeWrapper = this.readLocalOffset(); + } catch (MQClientException e) { + return -1; + } + if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) { + AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); + if (offset != null) { + this.updateOffset(mq, offset.get(), false); + return offset.get(); + } + } + } + default: + break; + } + } + + return -1; + } + + + @Override + public void persistAll(Set<MessageQueue> mqs) { + if (null == mqs || mqs.isEmpty()) + return; + + OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper(); + for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { + if (mqs.contains(entry.getKey())) { + AtomicLong offset = entry.getValue(); + offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset); + } + } + + String jsonString = offsetSerializeWrapper.toJson(true); + if (jsonString != null) { + try { + MixAll.string2File(jsonString, this.storePath); + } catch (IOException e) { + log.error("persistAll consumer offset Exception, " + this.storePath, e); + } + } + } + + + @Override + public void persist(MessageQueue mq) { + } + + @Override + public void removeOffset(MessageQueue mq) { + + } + + @Override + public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + + } + + @Override + public Map<MessageQueue, Long> cloneOffsetTable(String topic) { + Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>(); + for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { + MessageQueue mq = entry.getKey(); + if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) { + continue; + } + cloneOffsetTable.put(mq, entry.getValue().get()); + + } + return cloneOffsetTable; + } + + private OffsetSerializeWrapper readLocalOffset() throws MQClientException { + String content = MixAll.file2String(this.storePath); + if (null == content || content.length() == 0) { + return this.readLocalOffsetBak(); + } else { + OffsetSerializeWrapper offsetSerializeWrapper = null; + try { + offsetSerializeWrapper = + OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); + } catch (Exception e) { + log.warn("readLocalOffset Exception, and try to correct", e); + return this.readLocalOffsetBak(); + } + + return offsetSerializeWrapper; + } + } + + private OffsetSerializeWrapper readLocalOffsetBak() throws MQClientException { + String content = MixAll.file2String(this.storePath + ".bak"); + if (content != null && content.length() > 0) { + OffsetSerializeWrapper offsetSerializeWrapper = null; + try { + offsetSerializeWrapper = + OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); + } catch (Exception e) { + log.warn("readLocalOffset Exception", e); + throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" // + + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), // + e); + } + return offsetSerializeWrapper; + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java new file mode 100644 index 0000000..e69ad23 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.store; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * Wrapper class for offset serialization + * + * @author shijia.wxr + */ +public class OffsetSerializeWrapper extends RemotingSerializable { + private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = + new ConcurrentHashMap<MessageQueue, AtomicLong>(); + + public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() { + return offsetTable; + } + + public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) { + this.offsetTable = offsetTable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java new file mode 100644 index 0000000..7c7ccc6 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.store; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.util.Map; +import java.util.Set; + + +/** + * Offset store interface + * + * @author shijia.wxr + */ +public interface OffsetStore { + /** + * Load + * + * @throws MQClientException + */ + void load() throws MQClientException; + + + /** + * Update the offset,store it in memory + * + * @param mq + * @param offset + * @param increaseOnly + */ + void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly); + + /** + * Get offset from local storage + * + * @param mq + * @param type + * + * @return The fetched offset + */ + long readOffset(final MessageQueue mq, final ReadOffsetType type); + + /** + * Persist all offsets,may be in local storage or remote name server + * + * @param mqs + */ + void persistAll(final Set<MessageQueue> mqs); + + /** + * Persist the offset,may be in local storage or remote name server + * + * @param mq + */ + void persist(final MessageQueue mq); + + /** + * Remove offset + * + * @param mq + */ + void removeOffset(MessageQueue mq); + + /** + * @param topic + * + * @return The cloned offset table of given topic + */ + Map<MessageQueue, Long> cloneOffsetTable(String topic); + + /** + * + * @param mq + * @param offset + * @param isOneway + */ + void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java new file mode 100644 index 0000000..c2ee9b7 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.store; + +public enum ReadOffsetType { + /** + * From memory + */ + READ_FROM_MEMORY, + /** + * From storage + */ + READ_FROM_STORE, + /** + * From memory,then from storage + */ + MEMORY_FIRST_THEN_STORE; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java new file mode 100644 index 0000000..082e7e8 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.store; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.FindBrokerResult; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * Remote storage implementation + * + * @author shijia.wxr + */ +public class RemoteBrokerOffsetStore implements OffsetStore { + private final static Logger log = ClientLogger.getLog(); + private final MQClientInstance mQClientFactory; + private final String groupName; + private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = + new ConcurrentHashMap<MessageQueue, AtomicLong>(); + + + public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) { + this.mQClientFactory = mQClientFactory; + this.groupName = groupName; + } + + + @Override + public void load() { + } + + + @Override + public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { + if (mq != null) { + AtomicLong offsetOld = this.offsetTable.get(mq); + if (null == offsetOld) { + offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); + } + + if (null != offsetOld) { + if (increaseOnly) { + MixAll.compareAndIncreaseOnly(offsetOld, offset); + } else { + offsetOld.set(offset); + } + } + } + } + + + @Override + public long readOffset(final MessageQueue mq, final ReadOffsetType type) { + if (mq != null) { + switch (type) { + case MEMORY_FIRST_THEN_STORE: + case READ_FROM_MEMORY: { + AtomicLong offset = this.offsetTable.get(mq); + if (offset != null) { + return offset.get(); + } else if (ReadOffsetType.READ_FROM_MEMORY == type) { + return -1; + } + } + case READ_FROM_STORE: { + try { + long brokerOffset = this.fetchConsumeOffsetFromBroker(mq); + AtomicLong offset = new AtomicLong(brokerOffset); + this.updateOffset(mq, offset.get(), false); + return brokerOffset; + } + // No offset in broker + catch (MQBrokerException e) { + return -1; + } + //Other exceptions + catch (Exception e) { + log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e); + return -2; + } + } + default: + break; + } + } + + return -1; + } + + + @Override + public void persistAll(Set<MessageQueue> mqs) { + if (null == mqs || mqs.isEmpty()) + return; + + final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>(); + if (mqs != null && !mqs.isEmpty()) { + for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { + MessageQueue mq = entry.getKey(); + AtomicLong offset = entry.getValue(); + if (offset != null) { + if (mqs.contains(mq)) { + try { + this.updateConsumeOffsetToBroker(mq, offset.get()); + log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", + this.groupName, + this.mQClientFactory.getClientId(), + mq, + offset.get()); + } catch (Exception e) { + log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); + } + } else { + unusedMQ.add(mq); + } + } + } + } + + if (!unusedMQ.isEmpty()) { + for (MessageQueue mq : unusedMQ) { + this.offsetTable.remove(mq); + log.info("remove unused mq, {}, {}", mq, this.groupName); + } + } + } + + + @Override + public void persist(MessageQueue mq) { + AtomicLong offset = this.offsetTable.get(mq); + if (offset != null) { + try { + this.updateConsumeOffsetToBroker(mq, offset.get()); + log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", + this.groupName, + this.mQClientFactory.getClientId(), + mq, + offset.get()); + } catch (Exception e) { + log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); + } + } + } + + public void removeOffset(MessageQueue mq) { + if (mq != null) { + this.offsetTable.remove(mq); + log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq, + offsetTable.size()); + } + } + + @Override + public Map<MessageQueue, Long> cloneOffsetTable(String topic) { + Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>(); + for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { + MessageQueue mq = entry.getKey(); + if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) { + continue; + } + cloneOffsetTable.put(mq, entry.getValue().get()); + } + return cloneOffsetTable; + } + + /** + * Update the Consumer Offset in one way, once the Master is off, updated to Slave, + * here need to be optimized. + */ + private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException { + updateConsumeOffsetToBroker(mq, offset, true); + } + + /** + * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, + * here need to be optimized. + */ + @Override + public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException { + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + if (null == findBrokerResult) { + // TODO Here may be heavily overhead for Name Server,need tuning + this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); + findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + } + + if (findBrokerResult != null) { + UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setConsumerGroup(this.groupName); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setCommitOffset(offset); + + if (isOneway) { + this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( + findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + } else { + this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( + findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + } + } else { + throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); + } + } + + private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException { + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + if (null == findBrokerResult) { + // TODO Here may be heavily overhead for Name Server,need tuning + this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); + findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + } + + if (findBrokerResult != null) { + QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader(); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setConsumerGroup(this.groupName); + requestHeader.setQueueId(mq.getQueueId()); + + return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset( + findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + } else { + throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java new file mode 100644 index 0000000..5e8d1b9 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.exception; + +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.help.FAQUrl; + + +/** + * @author shijia.wxr + */ +public class MQBrokerException extends Exception { + private static final long serialVersionUID = 5975020272601250368L; + private final int responseCode; + private final String errorMessage; + + + public MQBrokerException(int responseCode, String errorMessage) { + super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: " + + errorMessage)); + this.responseCode = responseCode; + this.errorMessage = errorMessage; + } + + + public int getResponseCode() { + return responseCode; + } + + + public String getErrorMessage() { + return errorMessage; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java new file mode 100644 index 0000000..5f32d12 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.exception; + +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.help.FAQUrl; + + +/** + * @author shijia.wxr + */ +public class MQClientException extends Exception { + private static final long serialVersionUID = -5758410930844185841L; + private int responseCode; + private String errorMessage; + + + public MQClientException(String errorMessage, Throwable cause) { + super(FAQUrl.attachDefaultURL(errorMessage), cause); + this.responseCode = -1; + this.errorMessage = errorMessage; + } + + + public MQClientException(int responseCode, String errorMessage) { + super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: " + + errorMessage)); + this.responseCode = responseCode; + this.errorMessage = errorMessage; + } + + public int getResponseCode() { + return responseCode; + } + + public MQClientException setResponseCode(final int responseCode) { + this.responseCode = responseCode; + return this; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(final String errorMessage) { + this.errorMessage = errorMessage; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java new file mode 100644 index 0000000..8cb4ca9 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.hook; + +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; + + +/** + * @author manhong.yqd + */ +public class CheckForbiddenContext { + private String nameSrvAddr; + private String group; + private Message message; + private MessageQueue mq; + private String brokerAddr; + private CommunicationMode communicationMode; + private SendResult sendResult; + private Exception exception; + private Object arg; + private boolean unitMode = false; + + + public String getGroup() { + return group; + } + + + public void setGroup(String group) { + this.group = group; + } + + + public Message getMessage() { + return message; + } + + + public void setMessage(Message message) { + this.message = message; + } + + + public MessageQueue getMq() { + return mq; + } + + + public void setMq(MessageQueue mq) { + this.mq = mq; + } + + + public String getBrokerAddr() { + return brokerAddr; + } + + + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } + + + public CommunicationMode getCommunicationMode() { + return communicationMode; + } + + + public void setCommunicationMode(CommunicationMode communicationMode) { + this.communicationMode = communicationMode; + } + + + public SendResult getSendResult() { + return sendResult; + } + + + public void setSendResult(SendResult sendResult) { + this.sendResult = sendResult; + } + + + public Exception getException() { + return exception; + } + + + public void setException(Exception exception) { + this.exception = exception; + } + + + public Object getArg() { + return arg; + } + + + public void setArg(Object arg) { + this.arg = arg; + } + + + public boolean isUnitMode() { + return unitMode; + } + + + public void setUnitMode(boolean isUnitMode) { + this.unitMode = isUnitMode; + } + + + public String getNameSrvAddr() { + return nameSrvAddr; + } + + + public void setNameSrvAddr(String nameSrvAddr) { + this.nameSrvAddr = nameSrvAddr; + } + + + @Override + public String toString() { + return "SendMessageContext [nameSrvAddr=" + nameSrvAddr + ", group=" + group + ", message=" + message + + ", mq=" + mq + ", brokerAddr=" + brokerAddr + ", communicationMode=" + communicationMode + + ", sendResult=" + sendResult + ", exception=" + exception + ", unitMode=" + unitMode + + ", arg=" + arg + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java new file mode 100644 index 0000000..41ed088 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.hook; + +import org.apache.rocketmq.client.exception.MQClientException; + + +/** + * @author manhong.yqd + */ +public interface CheckForbiddenHook { + public String hookName(); + + + public void checkForbidden(final CheckForbiddenContext context) throws MQClientException; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java new file mode 100644 index 0000000..f141fac --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.hook; + +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.List; +import java.util.Map; + + +public class ConsumeMessageContext { + private String consumerGroup; + private List<MessageExt> msgList; + private MessageQueue mq; + private boolean success; + private String status; + private Object mqTraceContext; + private Map<String, String> props; + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public List<MessageExt> getMsgList() { + return msgList; + } + + + public void setMsgList(List<MessageExt> msgList) { + this.msgList = msgList; + } + + + public MessageQueue getMq() { + return mq; + } + + + public void setMq(MessageQueue mq) { + this.mq = mq; + } + + + public boolean isSuccess() { + return success; + } + + + public void setSuccess(boolean success) { + this.success = success; + } + + + public Object getMqTraceContext() { + return mqTraceContext; + } + + + public void setMqTraceContext(Object mqTraceContext) { + this.mqTraceContext = mqTraceContext; + } + + + public Map<String, String> getProps() { + return props; + } + + + public void setProps(Map<String, String> props) { + this.props = props; + } + + + public String getStatus() { + return status; + } + + + public void setStatus(String status) { + this.status = status; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java new file mode 100644 index 0000000..8161d2e --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.hook; + +public interface ConsumeMessageHook { + String hookName(); + + void consumeMessageBefore(final ConsumeMessageContext context); + + void consumeMessageAfter(final ConsumeMessageContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java new file mode 100644 index 0000000..942fd71 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.hook; + +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.List; + + +/** + * @author manhong.yqd + */ +public class FilterMessageContext { + private String consumerGroup; + private List<MessageExt> msgList; + private MessageQueue mq; + private Object arg; + private boolean unitMode; + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public List<MessageExt> getMsgList() { + return msgList; + } + + + public void setMsgList(List<MessageExt> msgList) { + this.msgList = msgList; + } + + + public MessageQueue getMq() { + return mq; + } + + + public void setMq(MessageQueue mq) { + this.mq = mq; + } + + + public Object getArg() { + return arg; + } + + + public void setArg(Object arg) { + this.arg = arg; + } + + + public boolean isUnitMode() { + return unitMode; + } + + + public void setUnitMode(boolean isUnitMode) { + this.unitMode = isUnitMode; + } + + + @Override + public String toString() { + return "ConsumeMessageContext [consumerGroup=" + consumerGroup + ", msgList=" + msgList + ", mq=" + + mq + ", arg=" + arg + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java new file mode 100644 index 0000000..016ff56 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.hook; + +/** + * @author manhong.yqd + */ +public interface FilterMessageHook { + public String hookName(); + + + public void filterMessage(final FilterMessageContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java new file mode 100644 index 0000000..bfb4a47 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.hook; + +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.message.MessageType; + +import java.util.Map; + + +public class SendMessageContext { + private String producerGroup; + private Message message; + private MessageQueue mq; + private String brokerAddr; + private String bornHost; + private CommunicationMode communicationMode; + private SendResult sendResult; + private Exception exception; + private Object mqTraceContext; + private Map<String, String> props; + private DefaultMQProducerImpl producer; + private MessageType msgType = MessageType.Normal_Msg; + + public MessageType getMsgType() { + return msgType; + } + + public void setMsgType(final MessageType msgType) { + this.msgType = msgType; + } + + public DefaultMQProducerImpl getProducer() { + return producer; + } + + public void setProducer(final DefaultMQProducerImpl producer) { + this.producer = producer; + } + + public String getProducerGroup() { + return producerGroup; + } + + + public void setProducerGroup(String producerGroup) { + this.producerGroup = producerGroup; + } + + + public Message getMessage() { + return message; + } + + + public void setMessage(Message message) { + this.message = message; + } + + + public MessageQueue getMq() { + return mq; + } + + + public void setMq(MessageQueue mq) { + this.mq = mq; + } + + + public String getBrokerAddr() { + return brokerAddr; + } + + + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } + + + public CommunicationMode getCommunicationMode() { + return communicationMode; + } + + + public void setCommunicationMode(CommunicationMode communicationMode) { + this.communicationMode = communicationMode; + } + + + public SendResult getSendResult() { + return sendResult; + } + + + public void setSendResult(SendResult sendResult) { + this.sendResult = sendResult; + } + + + public Exception getException() { + return exception; + } + + + public void setException(Exception exception) { + this.exception = exception; + } + + + public Object getMqTraceContext() { + return mqTraceContext; + } + + + public void setMqTraceContext(Object mqTraceContext) { + this.mqTraceContext = mqTraceContext; + } + + + public Map<String, String> getProps() { + return props; + } + + + public void setProps(Map<String, String> props) { + this.props = props; + } + + + public String getBornHost() { + return bornHost; + } + + + public void setBornHost(String bornHost) { + this.bornHost = bornHost; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java new file mode 100644 index 0000000..c040831 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.hook; + +public interface SendMessageHook { + String hookName(); + + void sendMessageBefore(final SendMessageContext context); + + void sendMessageAfter(final SendMessageContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java new file mode 100644 index 0000000..50e9b45 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl; + +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.impl.producer.MQProducerInner; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody; +import org.apache.rocketmq.common.protocol.body.ResetOffsetBody; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.ChannelHandlerContext; +import org.apache.rocketmq.common.protocol.header.*; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + + +/** + * @author shijia.wxr + */ +public class ClientRemotingProcessor implements NettyRequestProcessor { + private final Logger log = ClientLogger.getLog(); + private final MQClientInstance mqClientFactory; + + + public ClientRemotingProcessor(final MQClientInstance mqClientFactory) { + this.mqClientFactory = mqClientFactory; + } + + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + switch (request.getCode()) { + case RequestCode.CHECK_TRANSACTION_STATE: + return this.checkTransactionState(ctx, request); + case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED: + return this.notifyConsumerIdsChanged(ctx, request); + case RequestCode.RESET_CONSUMER_CLIENT_OFFSET: + return this.resetOffset(ctx, request); + case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT: + return this.getConsumeStatus(ctx, request); + + case RequestCode.GET_CONSUMER_RUNNING_INFO: + return this.getConsumerRunningInfo(ctx, request); + + case RequestCode.CONSUME_MESSAGE_DIRECTLY: + return this.consumeMessageDirectly(ctx, request); + default: + break; + } + return null; + } + + @Override + public boolean rejectRequest() { + return false; + } + + public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final CheckTransactionStateRequestHeader requestHeader = + (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class); + final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody()); + final MessageExt messageExt = MessageDecoder.decode(byteBuffer); + if (messageExt != null) { + final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); + if (group != null) { + MQProducerInner producer = this.mqClientFactory.selectProducer(group); + if (producer != null) { + final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); + producer.checkTransactionState(addr, messageExt, requestHeader); + } else { + log.debug("checkTransactionState, pick producer by group[{}] failed", group); + } + } else { + log.warn("checkTransactionState, pick producer group failed"); + } + } else { + log.warn("checkTransactionState, decode message failed"); + } + + return null; + } + + public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + try { + final NotifyConsumerIdsChangedRequestHeader requestHeader = + (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class); + log.info("receive broker's notification[{}], the consumer group: {} changed, rebalance immediately", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), + requestHeader.getConsumerGroup()); + this.mqClientFactory.rebalanceImmediately(); + } catch (Exception e) { + log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e)); + } + return null; + } + + public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final ResetOffsetRequestHeader requestHeader = + (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); + log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}", + new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), + requestHeader.getTimestamp()}); + Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(); + if (request.getBody() != null) { + ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class); + offsetTable = body.getOffsetTable(); + } + this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable); + return null; + } + + @Deprecated + public RemotingCommand getConsumeStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final GetConsumerStatusRequestHeader requestHeader = + (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); + + Map<MessageQueue, Long> offsetTable = this.mqClientFactory.getConsumerStatus(requestHeader.getTopic(), requestHeader.getGroup()); + GetConsumerStatusBody body = new GetConsumerStatusBody(); + body.setMessageQueueTable(offsetTable); + response.setBody(body.encode()); + response.setCode(ResponseCode.SUCCESS); + return response; + } + + private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final GetConsumerRunningInfoRequestHeader requestHeader = + (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); + + ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup()); + if (null != consumerRunningInfo) { + if (requestHeader.isJstackEnable()) { + Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); + String jstack = UtilAll.jstack(map); + consumerRunningInfo.setJstack(jstack); + } + + response.setCode(ResponseCode.SUCCESS); + response.setBody(consumerRunningInfo.encode()); + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup())); + } + + return response; + } + + private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final ConsumeMessageDirectlyResultRequestHeader requestHeader = + (ConsumeMessageDirectlyResultRequestHeader) request + .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class); + + final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody())); + + ConsumeMessageDirectlyResult result = + this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName()); + + if (null != result) { + response.setCode(ResponseCode.SUCCESS); + response.setBody(result.encode()); + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup())); + } + + return response; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java b/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java new file mode 100644 index 0000000..0f57339 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl; + +/** + * @author shijia.wxr + */ +public enum CommunicationMode { + SYNC, + ASYNC, + ONEWAY, +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java new file mode 100644 index 0000000..56528ef --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl; + +/** + * @author shijia.wxr + */ +public class FindBrokerResult { + private final String brokerAddr; + private final boolean slave; + + + public FindBrokerResult(String brokerAddr, boolean slave) { + this.brokerAddr = brokerAddr; + this.slave = slave; + } + + + public String getBrokerAddr() { + return brokerAddr; + } + + + public boolean isSlave() { + return slave; + } +}