http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java deleted file mode 100644 index 9df2be3..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.consumer.store; - -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.impl.FindBrokerResult; -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; -import com.alibaba.rocketmq.remoting.exception.RemotingException; -import org.slf4j.Logger; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - - -/** - * Remote storage implementation - * - * @author shijia.wxr - */ -public class RemoteBrokerOffsetStore implements OffsetStore { - private final static Logger log = ClientLogger.getLog(); - private final MQClientInstance mQClientFactory; - private final String groupName; - private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = - new ConcurrentHashMap<MessageQueue, AtomicLong>(); - - - public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) { - this.mQClientFactory = mQClientFactory; - this.groupName = groupName; - } - - - @Override - public void load() { - } - - - @Override - public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { - if (mq != null) { - AtomicLong offsetOld = this.offsetTable.get(mq); - if (null == offsetOld) { - offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); - } - - if (null != offsetOld) { - if (increaseOnly) { - MixAll.compareAndIncreaseOnly(offsetOld, offset); - } else { - offsetOld.set(offset); - } - } - } - } - - - @Override - public long readOffset(final MessageQueue mq, final ReadOffsetType type) { - if (mq != null) { - switch (type) { - case MEMORY_FIRST_THEN_STORE: - case READ_FROM_MEMORY: { - AtomicLong offset = this.offsetTable.get(mq); - if (offset != null) { - return offset.get(); - } else if (ReadOffsetType.READ_FROM_MEMORY == type) { - return -1; - } - } - case READ_FROM_STORE: { - try { - long brokerOffset = this.fetchConsumeOffsetFromBroker(mq); - AtomicLong offset = new AtomicLong(brokerOffset); - this.updateOffset(mq, offset.get(), false); - return brokerOffset; - } - // No offset in broker - catch (MQBrokerException e) { - return -1; - } - //Other exceptions - catch (Exception e) { - log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e); - return -2; - } - } - default: - break; - } - } - - return -1; - } - - - @Override - public void persistAll(Set<MessageQueue> mqs) { - if (null == mqs || mqs.isEmpty()) - return; - - final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>(); - if (mqs != null && !mqs.isEmpty()) { - for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { - MessageQueue mq = entry.getKey(); - AtomicLong offset = entry.getValue(); - if (offset != null) { - if (mqs.contains(mq)) { - try { - this.updateConsumeOffsetToBroker(mq, offset.get()); - log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", - this.groupName, - this.mQClientFactory.getClientId(), - mq, - offset.get()); - } catch (Exception e) { - log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); - } - } else { - unusedMQ.add(mq); - } - } - } - } - - if (!unusedMQ.isEmpty()) { - for (MessageQueue mq : unusedMQ) { - this.offsetTable.remove(mq); - log.info("remove unused mq, {}, {}", mq, this.groupName); - } - } - } - - - @Override - public void persist(MessageQueue mq) { - AtomicLong offset = this.offsetTable.get(mq); - if (offset != null) { - try { - this.updateConsumeOffsetToBroker(mq, offset.get()); - log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", - this.groupName, - this.mQClientFactory.getClientId(), - mq, - offset.get()); - } catch (Exception e) { - log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); - } - } - } - - public void removeOffset(MessageQueue mq) { - if (mq != null) { - this.offsetTable.remove(mq); - log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq, - offsetTable.size()); - } - } - - @Override - public Map<MessageQueue, Long> cloneOffsetTable(String topic) { - Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>(); - for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { - MessageQueue mq = entry.getKey(); - if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) { - continue; - } - cloneOffsetTable.put(mq, entry.getValue().get()); - } - return cloneOffsetTable; - } - - /** - * Update the Consumer Offset in one way, once the Master is off, updated to Slave, - * here need to be optimized. - */ - private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException, - MQBrokerException, InterruptedException, MQClientException { - updateConsumeOffsetToBroker(mq, offset, true); - } - - /** - * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, - * here need to be optimized. - */ - @Override - public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, - MQBrokerException, InterruptedException, MQClientException { - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); - if (null == findBrokerResult) { - // TODO Here may be heavily overhead for Name Server,need tuning - this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); - } - - if (findBrokerResult != null) { - UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); - requestHeader.setTopic(mq.getTopic()); - requestHeader.setConsumerGroup(this.groupName); - requestHeader.setQueueId(mq.getQueueId()); - requestHeader.setCommitOffset(offset); - - if (isOneway) { - this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( - findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); - } else { - this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( - findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); - } - } else { - throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); - } - } - - private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, - InterruptedException, MQClientException { - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); - if (null == findBrokerResult) { - // TODO Here may be heavily overhead for Name Server,need tuning - this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); - } - - if (findBrokerResult != null) { - QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader(); - requestHeader.setTopic(mq.getTopic()); - requestHeader.setConsumerGroup(this.groupName); - requestHeader.setQueueId(mq.getQueueId()); - - return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset( - findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); - } else { - throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/exception/MQBrokerException.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/exception/MQBrokerException.java b/client/src/main/java/com/alibaba/rocketmq/client/exception/MQBrokerException.java deleted file mode 100644 index 7fc09f8..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/exception/MQBrokerException.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.exception; - -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.help.FAQUrl; - - -/** - * @author shijia.wxr - */ -public class MQBrokerException extends Exception { - private static final long serialVersionUID = 5975020272601250368L; - private final int responseCode; - private final String errorMessage; - - - public MQBrokerException(int responseCode, String errorMessage) { - super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: " - + errorMessage)); - this.responseCode = responseCode; - this.errorMessage = errorMessage; - } - - - public int getResponseCode() { - return responseCode; - } - - - public String getErrorMessage() { - return errorMessage; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/exception/MQClientException.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/exception/MQClientException.java b/client/src/main/java/com/alibaba/rocketmq/client/exception/MQClientException.java deleted file mode 100644 index f343a67..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/exception/MQClientException.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.exception; - -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.help.FAQUrl; - - -/** - * @author shijia.wxr - */ -public class MQClientException extends Exception { - private static final long serialVersionUID = -5758410930844185841L; - private int responseCode; - private String errorMessage; - - - public MQClientException(String errorMessage, Throwable cause) { - super(FAQUrl.attachDefaultURL(errorMessage), cause); - this.responseCode = -1; - this.errorMessage = errorMessage; - } - - - public MQClientException(int responseCode, String errorMessage) { - super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: " - + errorMessage)); - this.responseCode = responseCode; - this.errorMessage = errorMessage; - } - - public int getResponseCode() { - return responseCode; - } - - public MQClientException setResponseCode(final int responseCode) { - this.responseCode = responseCode; - return this; - } - - public String getErrorMessage() { - return errorMessage; - } - - public void setErrorMessage(final String errorMessage) { - this.errorMessage = errorMessage; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenContext.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenContext.java deleted file mode 100644 index 6291803..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenContext.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.hook; - -import com.alibaba.rocketmq.client.impl.CommunicationMode; -import com.alibaba.rocketmq.client.producer.SendResult; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.common.message.MessageQueue; - - -/** - * @author manhong.yqd - */ -public class CheckForbiddenContext { - private String nameSrvAddr; - private String group; - private Message message; - private MessageQueue mq; - private String brokerAddr; - private CommunicationMode communicationMode; - private SendResult sendResult; - private Exception exception; - private Object arg; - private boolean unitMode = false; - - - public String getGroup() { - return group; - } - - - public void setGroup(String group) { - this.group = group; - } - - - public Message getMessage() { - return message; - } - - - public void setMessage(Message message) { - this.message = message; - } - - - public MessageQueue getMq() { - return mq; - } - - - public void setMq(MessageQueue mq) { - this.mq = mq; - } - - - public String getBrokerAddr() { - return brokerAddr; - } - - - public void setBrokerAddr(String brokerAddr) { - this.brokerAddr = brokerAddr; - } - - - public CommunicationMode getCommunicationMode() { - return communicationMode; - } - - - public void setCommunicationMode(CommunicationMode communicationMode) { - this.communicationMode = communicationMode; - } - - - public SendResult getSendResult() { - return sendResult; - } - - - public void setSendResult(SendResult sendResult) { - this.sendResult = sendResult; - } - - - public Exception getException() { - return exception; - } - - - public void setException(Exception exception) { - this.exception = exception; - } - - - public Object getArg() { - return arg; - } - - - public void setArg(Object arg) { - this.arg = arg; - } - - - public boolean isUnitMode() { - return unitMode; - } - - - public void setUnitMode(boolean isUnitMode) { - this.unitMode = isUnitMode; - } - - - public String getNameSrvAddr() { - return nameSrvAddr; - } - - - public void setNameSrvAddr(String nameSrvAddr) { - this.nameSrvAddr = nameSrvAddr; - } - - - @Override - public String toString() { - return "SendMessageContext [nameSrvAddr=" + nameSrvAddr + ", group=" + group + ", message=" + message - + ", mq=" + mq + ", brokerAddr=" + brokerAddr + ", communicationMode=" + communicationMode - + ", sendResult=" + sendResult + ", exception=" + exception + ", unitMode=" + unitMode - + ", arg=" + arg + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenHook.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenHook.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenHook.java deleted file mode 100644 index 35a2740..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenHook.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.client.hook; - -import com.alibaba.rocketmq.client.exception.MQClientException; - - -/** - * @author manhong.yqd - */ -public interface CheckForbiddenHook { - public String hookName(); - - - public void checkForbidden(final CheckForbiddenContext context) throws MQClientException; -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageContext.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageContext.java deleted file mode 100644 index 0c0e7cd..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageContext.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.hook; - -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.message.MessageQueue; - -import java.util.List; -import java.util.Map; - - -public class ConsumeMessageContext { - private String consumerGroup; - private List<MessageExt> msgList; - private MessageQueue mq; - private boolean success; - private String status; - private Object mqTraceContext; - private Map<String, String> props; - - - public String getConsumerGroup() { - return consumerGroup; - } - - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } - - - public List<MessageExt> getMsgList() { - return msgList; - } - - - public void setMsgList(List<MessageExt> msgList) { - this.msgList = msgList; - } - - - public MessageQueue getMq() { - return mq; - } - - - public void setMq(MessageQueue mq) { - this.mq = mq; - } - - - public boolean isSuccess() { - return success; - } - - - public void setSuccess(boolean success) { - this.success = success; - } - - - public Object getMqTraceContext() { - return mqTraceContext; - } - - - public void setMqTraceContext(Object mqTraceContext) { - this.mqTraceContext = mqTraceContext; - } - - - public Map<String, String> getProps() { - return props; - } - - - public void setProps(Map<String, String> props) { - this.props = props; - } - - - public String getStatus() { - return status; - } - - - public void setStatus(String status) { - this.status = status; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageHook.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageHook.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageHook.java deleted file mode 100644 index 96b0e53..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageHook.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.hook; - -public interface ConsumeMessageHook { - String hookName(); - - void consumeMessageBefore(final ConsumeMessageContext context); - - void consumeMessageAfter(final ConsumeMessageContext context); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageContext.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageContext.java deleted file mode 100644 index c47f09e..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageContext.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.hook; - -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.message.MessageQueue; - -import java.util.List; - - -/** - * @author manhong.yqd - */ -public class FilterMessageContext { - private String consumerGroup; - private List<MessageExt> msgList; - private MessageQueue mq; - private Object arg; - private boolean unitMode; - - - public String getConsumerGroup() { - return consumerGroup; - } - - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } - - - public List<MessageExt> getMsgList() { - return msgList; - } - - - public void setMsgList(List<MessageExt> msgList) { - this.msgList = msgList; - } - - - public MessageQueue getMq() { - return mq; - } - - - public void setMq(MessageQueue mq) { - this.mq = mq; - } - - - public Object getArg() { - return arg; - } - - - public void setArg(Object arg) { - this.arg = arg; - } - - - public boolean isUnitMode() { - return unitMode; - } - - - public void setUnitMode(boolean isUnitMode) { - this.unitMode = isUnitMode; - } - - - @Override - public String toString() { - return "ConsumeMessageContext [consumerGroup=" + consumerGroup + ", msgList=" + msgList + ", mq=" - + mq + ", arg=" + arg + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageHook.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageHook.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageHook.java deleted file mode 100644 index 1528ef9..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageHook.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.hook; - -/** - * @author manhong.yqd - */ -public interface FilterMessageHook { - public String hookName(); - - - public void filterMessage(final FilterMessageContext context); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageContext.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageContext.java deleted file mode 100644 index 9552456..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageContext.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.hook; - -import com.alibaba.rocketmq.client.impl.CommunicationMode; -import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl; -import com.alibaba.rocketmq.client.producer.SendResult; -import com.alibaba.rocketmq.common.message.Message; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.message.MessageType; - -import java.util.Map; - - -public class SendMessageContext { - private String producerGroup; - private Message message; - private MessageQueue mq; - private String brokerAddr; - private String bornHost; - private CommunicationMode communicationMode; - private SendResult sendResult; - private Exception exception; - private Object mqTraceContext; - private Map<String, String> props; - private DefaultMQProducerImpl producer; - private MessageType msgType = MessageType.Normal_Msg; - - public MessageType getMsgType() { - return msgType; - } - - public void setMsgType(final MessageType msgType) { - this.msgType = msgType; - } - - public DefaultMQProducerImpl getProducer() { - return producer; - } - - public void setProducer(final DefaultMQProducerImpl producer) { - this.producer = producer; - } - - public String getProducerGroup() { - return producerGroup; - } - - - public void setProducerGroup(String producerGroup) { - this.producerGroup = producerGroup; - } - - - public Message getMessage() { - return message; - } - - - public void setMessage(Message message) { - this.message = message; - } - - - public MessageQueue getMq() { - return mq; - } - - - public void setMq(MessageQueue mq) { - this.mq = mq; - } - - - public String getBrokerAddr() { - return brokerAddr; - } - - - public void setBrokerAddr(String brokerAddr) { - this.brokerAddr = brokerAddr; - } - - - public CommunicationMode getCommunicationMode() { - return communicationMode; - } - - - public void setCommunicationMode(CommunicationMode communicationMode) { - this.communicationMode = communicationMode; - } - - - public SendResult getSendResult() { - return sendResult; - } - - - public void setSendResult(SendResult sendResult) { - this.sendResult = sendResult; - } - - - public Exception getException() { - return exception; - } - - - public void setException(Exception exception) { - this.exception = exception; - } - - - public Object getMqTraceContext() { - return mqTraceContext; - } - - - public void setMqTraceContext(Object mqTraceContext) { - this.mqTraceContext = mqTraceContext; - } - - - public Map<String, String> getProps() { - return props; - } - - - public void setProps(Map<String, String> props) { - this.props = props; - } - - - public String getBornHost() { - return bornHost; - } - - - public void setBornHost(String bornHost) { - this.bornHost = bornHost; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageHook.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageHook.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageHook.java deleted file mode 100644 index 22e1fb3..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageHook.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.hook; - -public interface SendMessageHook { - String hookName(); - - void sendMessageBefore(final SendMessageContext context); - - void sendMessageAfter(final SendMessageContext context); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/ClientRemotingProcessor.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/ClientRemotingProcessor.java deleted file mode 100644 index 79a539e..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/ClientRemotingProcessor.java +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl; - -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.client.impl.producer.MQProducerInner; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.message.MessageConst; -import com.alibaba.rocketmq.common.message.MessageDecoder; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.RequestCode; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; -import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; -import com.alibaba.rocketmq.common.protocol.body.GetConsumerStatusBody; -import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBody; -import com.alibaba.rocketmq.common.protocol.header.*; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.ChannelHandlerContext; -import org.slf4j.Logger; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; - - -/** - * @author shijia.wxr - */ -public class ClientRemotingProcessor implements NettyRequestProcessor { - private final Logger log = ClientLogger.getLog(); - private final MQClientInstance mqClientFactory; - - - public ClientRemotingProcessor(final MQClientInstance mqClientFactory) { - this.mqClientFactory = mqClientFactory; - } - - - @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - switch (request.getCode()) { - case RequestCode.CHECK_TRANSACTION_STATE: - return this.checkTransactionState(ctx, request); - case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED: - return this.notifyConsumerIdsChanged(ctx, request); - case RequestCode.RESET_CONSUMER_CLIENT_OFFSET: - return this.resetOffset(ctx, request); - case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT: - return this.getConsumeStatus(ctx, request); - - case RequestCode.GET_CONSUMER_RUNNING_INFO: - return this.getConsumerRunningInfo(ctx, request); - - case RequestCode.CONSUME_MESSAGE_DIRECTLY: - return this.consumeMessageDirectly(ctx, request); - default: - break; - } - return null; - } - - @Override - public boolean rejectRequest() { - return false; - } - - public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final CheckTransactionStateRequestHeader requestHeader = - (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class); - final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody()); - final MessageExt messageExt = MessageDecoder.decode(byteBuffer); - if (messageExt != null) { - final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); - if (group != null) { - MQProducerInner producer = this.mqClientFactory.selectProducer(group); - if (producer != null) { - final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); - producer.checkTransactionState(addr, messageExt, requestHeader); - } else { - log.debug("checkTransactionState, pick producer by group[{}] failed", group); - } - } else { - log.warn("checkTransactionState, pick producer group failed"); - } - } else { - log.warn("checkTransactionState, decode message failed"); - } - - return null; - } - - public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - try { - final NotifyConsumerIdsChangedRequestHeader requestHeader = - (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class); - log.info("receive broker's notification[{}], the consumer group: {} changed, rebalance immediately", - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), - requestHeader.getConsumerGroup()); - this.mqClientFactory.rebalanceImmediately(); - } catch (Exception e) { - log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e)); - } - return null; - } - - public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final ResetOffsetRequestHeader requestHeader = - (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); - log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}", - new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getTimestamp()}); - Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(); - if (request.getBody() != null) { - ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class); - offsetTable = body.getOffsetTable(); - } - this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable); - return null; - } - - @Deprecated - public RemotingCommand getConsumeStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final GetConsumerStatusRequestHeader requestHeader = - (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); - - Map<MessageQueue, Long> offsetTable = this.mqClientFactory.getConsumerStatus(requestHeader.getTopic(), requestHeader.getGroup()); - GetConsumerStatusBody body = new GetConsumerStatusBody(); - body.setMessageQueueTable(offsetTable); - response.setBody(body.encode()); - response.setCode(ResponseCode.SUCCESS); - return response; - } - - private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final GetConsumerRunningInfoRequestHeader requestHeader = - (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); - - ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup()); - if (null != consumerRunningInfo) { - if (requestHeader.isJstackEnable()) { - Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); - String jstack = UtilAll.jstack(map); - consumerRunningInfo.setJstack(jstack); - } - - response.setCode(ResponseCode.SUCCESS); - response.setBody(consumerRunningInfo.encode()); - } else { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup())); - } - - return response; - } - - private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final ConsumeMessageDirectlyResultRequestHeader requestHeader = - (ConsumeMessageDirectlyResultRequestHeader) request - .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class); - - final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody())); - - ConsumeMessageDirectlyResult result = - this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName()); - - if (null != result) { - response.setCode(ResponseCode.SUCCESS); - response.setBody(result.encode()); - } else { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup())); - } - - return response; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/CommunicationMode.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/CommunicationMode.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/CommunicationMode.java deleted file mode 100644 index bc2f95d..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/CommunicationMode.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl; - -/** - * @author shijia.wxr - */ -public enum CommunicationMode { - SYNC, - ASYNC, - ONEWAY, -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/FindBrokerResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/FindBrokerResult.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/FindBrokerResult.java deleted file mode 100644 index 22805cd..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/FindBrokerResult.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl; - -/** - * @author shijia.wxr - */ -public class FindBrokerResult { - private final String brokerAddr; - private final boolean slave; - - - public FindBrokerResult(String brokerAddr, boolean slave) { - this.brokerAddr = brokerAddr; - this.slave = slave; - } - - - public String getBrokerAddr() { - return brokerAddr; - } - - - public boolean isSlave() { - return slave; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/MQAdminImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/MQAdminImpl.java deleted file mode 100644 index 9f7e964..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQAdminImpl.java +++ /dev/null @@ -1,418 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl; - -import com.alibaba.rocketmq.client.QueryResult; -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.TopicConfig; -import com.alibaba.rocketmq.common.help.FAQUrl; -import com.alibaba.rocketmq.common.message.*; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.header.QueryMessageRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.QueryMessageResponseHeader; -import com.alibaba.rocketmq.common.protocol.route.BrokerData; -import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; -import com.alibaba.rocketmq.remoting.InvokeCallback; -import com.alibaba.rocketmq.remoting.common.RemotingUtil; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.exception.RemotingException; -import com.alibaba.rocketmq.remoting.netty.ResponseFuture; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import org.slf4j.Logger; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - - -/** - * @author shijia.wxr - */ -public class MQAdminImpl { - - private final Logger log = ClientLogger.getLog(); - private final MQClientInstance mQClientFactory; - private long timeoutMillis = 6000; - - - public MQAdminImpl(MQClientInstance mQClientFactory) { - this.mQClientFactory = mQClientFactory; - } - - - public long getTimeoutMillis() { - return timeoutMillis; - } - - - public void setTimeoutMillis(long timeoutMillis) { - this.timeoutMillis = timeoutMillis; - } - - - public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { - createTopic(key, newTopic, queueNum, 0); - } - - - public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { - try { - TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis); - List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas(); - if (brokerDataList != null && !brokerDataList.isEmpty()) { - Collections.sort(brokerDataList); - - boolean createOKAtLeastOnce = false; - MQClientException exception = null; - - StringBuilder orderTopicString = new StringBuilder(); - - for (BrokerData brokerData : brokerDataList) { - String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); - if (addr != null) { - TopicConfig topicConfig = new TopicConfig(newTopic); - topicConfig.setReadQueueNums(queueNum); - topicConfig.setWriteQueueNums(queueNum); - topicConfig.setTopicSysFlag(topicSysFlag); - - boolean createOK = false; - for (int i = 0; i < 5; i++) { - try { - this.mQClientFactory.getMQClientAPIImpl().createTopic(addr, key, topicConfig, timeoutMillis); - createOK = true; - createOKAtLeastOnce = true; - break; - } catch (Exception e) { - if (4 == i) { - exception = new MQClientException("create topic to broker exception", e); - } - } - } - - if (createOK) { - orderTopicString.append(brokerData.getBrokerName()); - orderTopicString.append(":"); - orderTopicString.append(queueNum); - orderTopicString.append(";"); - } - } - } - - if (exception != null && !createOKAtLeastOnce) { - throw exception; - } - } else { - throw new MQClientException("Not found broker, maybe key is wrong", null); - } - } catch (Exception e) { - throw new MQClientException("create new topic failed", e); - } - } - - - public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException { - try { - TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis); - if (topicRouteData != null) { - TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData); - if (topicPublishInfo != null && topicPublishInfo.ok()) { - return topicPublishInfo.getMessageQueueList(); - } - } - } catch (Exception e) { - throw new MQClientException("Can not find Message Queue for this topic, " + topic, e); - } - - throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null); - } - - - public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { - try { - TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis); - if (topicRouteData != null) { - Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData); - if (!mqList.isEmpty()) { - return mqList; - } else { - throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null); - } - } - } catch (Exception e) { - throw new MQClientException( - "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), // - e); - } - - throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null); - } - - - public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); - if (null == brokerAddr) { - this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); - } - - if (brokerAddr != null) { - try { - return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp, - timeoutMillis); - } catch (Exception e) { - throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); - } - } - - throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); - } - - - public long maxOffset(MessageQueue mq) throws MQClientException { - String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); - if (null == brokerAddr) { - this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); - } - - if (brokerAddr != null) { - try { - return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis); - } catch (Exception e) { - throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); - } - } - - throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); - } - - - public long minOffset(MessageQueue mq) throws MQClientException { - String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); - if (null == brokerAddr) { - this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); - } - - if (brokerAddr != null) { - try { - return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis); - } catch (Exception e) { - throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); - } - } - - throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); - } - - - public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { - String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); - if (null == brokerAddr) { - this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); - } - - if (brokerAddr != null) { - try { - return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq.getTopic(), mq.getQueueId(), - timeoutMillis); - } catch (Exception e) { - throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); - } - } - - throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); - } - - public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - - MessageId messageId = null; - try { - messageId = MessageDecoder.decodeMessageId(msgId); - } catch (Exception e) { - throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message."); - } - return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()), - messageId.getOffset(), timeoutMillis); - } - - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, - InterruptedException { - return queryMessage(topic, key, maxNum, begin, end, false); - } - - public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws InterruptedException, MQClientException { - - QueryResult qr = this.queryMessage(topic, uniqKey, 32, - MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true); - if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) { - return qr.getMessageList().get(0); - } else { - return null; - } - } - - protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end, boolean isUniqKey) throws MQClientException, - InterruptedException { - TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic); - if (null == topicRouteData) { - this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); - topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic); - } - - if (topicRouteData != null) { - List<String> brokerAddrs = new LinkedList<String>(); - for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { - String addr = brokerData.selectBrokerAddr(); - if (addr != null) { - brokerAddrs.add(addr); - } - } - - if (!brokerAddrs.isEmpty()) { - final CountDownLatch countDownLatch = new CountDownLatch(brokerAddrs.size()); - final List<QueryResult> queryResultList = new LinkedList<QueryResult>(); - - for (String addr : brokerAddrs) { - try { - QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setKey(key); - requestHeader.setMaxNum(maxNum); - requestHeader.setBeginTimestamp(begin); - requestHeader.setEndTimestamp(end); - - this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader, timeoutMillis * 3, - new InvokeCallback() { - @Override - public void operationComplete(ResponseFuture responseFuture) { - try { - RemotingCommand response = responseFuture.getResponseCommand(); - if (response != null) { - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - QueryMessageResponseHeader responseHeader = null; - try { - responseHeader = - (QueryMessageResponseHeader) response - .decodeCommandCustomHeader(QueryMessageResponseHeader.class); - } catch (RemotingCommandException e) { - log.error("decodeCommandCustomHeader exception", e); - return; - } - - List<MessageExt> wrappers = - MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true); - - QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers); - queryResultList.add(qr); - break; - } - default: - log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark()); - break; - } - } else { - log.warn("getResponseCommand return null"); - } - } finally { - countDownLatch.countDown(); - } - } - }, isUniqKey); - } catch (Exception e) { - log.warn("queryMessage exception", e); - } - - } - - boolean ok = countDownLatch.await(timeoutMillis * 4, TimeUnit.MILLISECONDS); - if (!ok) { - log.warn("queryMessage, maybe some broker failed"); - } - - long indexLastUpdateTimestamp = 0; - List<MessageExt> messageList = new LinkedList<MessageExt>(); - for (QueryResult qr : queryResultList) { - if (qr.getIndexLastUpdateTimestamp() > indexLastUpdateTimestamp) { - indexLastUpdateTimestamp = qr.getIndexLastUpdateTimestamp(); - } - - for (MessageExt msgExt : qr.getMessageList()) { - if (isUniqKey) { - if (msgExt.getMsgId().equals(key)) { - - if (messageList.size() > 0) { - - if (messageList.get(0).getStoreTimestamp() > msgExt.getStoreTimestamp()) { - - messageList.clear(); - messageList.add(msgExt); - } - - } else { - - messageList.add(msgExt); - } - } else { - log.warn("queryMessage by uniqKey, find message key not matched, maybe hash duplicate {}", msgExt.toString()); - } - } else { - String keys = msgExt.getKeys(); - if (keys != null) { - boolean matched = false; - String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR); - if (keyArray != null) { - for (String k : keyArray) { - if (key.equals(k)) { - matched = true; - break; - } - } - } - - if (matched) { - messageList.add(msgExt); - } else { - log.warn("queryMessage, find message key not matched, maybe hash duplicate {}", msgExt.toString()); - } - } - } - } - } - - if (!messageList.isEmpty()) { - return new QueryResult(indexLastUpdateTimestamp, messageList); - } else { - throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by key finished, but no message."); - } - } - } - - throw new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "The topic[" + topic + "] not matched route info"); - } -}
