http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java new file mode 100644 index 0000000..269918a --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.offset; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.broker.BrokerPathConfigHelper; +import com.alibaba.rocketmq.common.ConfigManager; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class ConsumerOffsetManager extends ConfigManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final String TOPIC_GROUP_SEPARATOR = "@"; + + private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = + new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512); + + private transient BrokerController brokerController; + + + public ConsumerOffsetManager() { + } + + + public ConsumerOffsetManager(BrokerController brokerController) { + this.brokerController = brokerController; + } + + + public void scanUnsubscribedTopic() { + Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); + String topicAtGroup = next.getKey(); + String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); + if (arrays != null && arrays.length == 2) { + String topic = arrays[0]; + String group = arrays[1]; + + if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic) + && this.offsetBehindMuchThanData(topic, next.getValue())) { + it.remove(); + log.warn("remove topic offset, {}", topicAtGroup); + } + } + } + } + + + private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap<Integer, Long> table) { + Iterator<Entry<Integer, Long>> it = table.entrySet().iterator(); + boolean result = !table.isEmpty(); + + while (it.hasNext() && result) { + Entry<Integer, Long> next = it.next(); + long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey()); + long offsetInPersist = next.getValue(); + if (offsetInPersist > minOffsetInStore) { + result = false; + } else { + result = true; + } + } + + return result; + } + + + public Set<String> whichTopicByConsumer(final String group) { + Set<String> topics = new HashSet<String>(); + + Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); + String topicAtGroup = next.getKey(); + String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); + if (arrays != null && arrays.length == 2) { + if (group.equals(arrays[1])) { + topics.add(arrays[0]); + } + } + } + + return topics; + } + + + public Set<String> whichGroupByTopic(final String topic) { + Set<String> groups = new HashSet<String>(); + + Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); + String topicAtGroup = next.getKey(); + String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); + if (arrays != null && arrays.length == 2) { + if (topic.equals(arrays[0])) { + groups.add(arrays[1]); + } + } + } + + return groups; + } + + + public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) { + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + this.commitOffset(clientHost, key, queueId, offset); + } + + private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { + ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key); + if (null == map) { + map = new ConcurrentHashMap<Integer, Long>(32); + map.put(queueId, offset); + this.offsetTable.put(key, map); + } else { + Long storeOffset = map.put(queueId, offset); + if (storeOffset != null && offset < storeOffset) { + log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); + } + } + } + + public long queryOffset(final String group, final String topic, final int queueId) { + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key); + if (null != map) { + Long offset = map.get(queueId); + if (offset != null) + return offset; + } + + return -1; + } + + public String encode() { + return this.encode(false); + } + + @Override + public String configFilePath() { + return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); + } + + @Override + public void decode(String jsonString) { + if (jsonString != null) { + ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class); + if (obj != null) { + this.offsetTable = obj.offsetTable; + } + } + } + + public String encode(final boolean prettyFormat) { + return RemotingSerializable.toJson(this, prettyFormat); + } + + public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() { + return offsetTable; + } + + + public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) { + this.offsetTable = offsetTable; + } + + + public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) { + + Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>(); + Set<String> topicGroups = this.offsetTable.keySet(); + if (!UtilAll.isBlank(filterGroups)) { + for (String group : filterGroups.split(",")) { + Iterator<String> it = topicGroups.iterator(); + while (it.hasNext()) { + if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) { + it.remove(); + } + } + } + } + + for (Map.Entry<String, ConcurrentHashMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) { + String topicGroup = offSetEntry.getKey(); + String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR); + if (topic.equals(topicGroupArr[0])) { + for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) { + long minOffset = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, entry.getKey()); + if (entry.getValue() >= minOffset) { + Long offset = queueMinOffset.get(entry.getKey()); + if (offset == null) { + queueMinOffset.put(entry.getKey(), Math.min(Long.MAX_VALUE, entry.getValue())); + } else { + queueMinOffset.put(entry.getKey(), Math.min(entry.getValue(), offset)); + } + } + } + } + + } + return queueMinOffset; + } + + + public Map<Integer, Long> queryOffset(final String group, final String topic) { + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + return this.offsetTable.get(key); + } + + + public void cloneOffset(final String srcGroup, final String destGroup, final String topic) { + ConcurrentHashMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup); + if (offsets != null) { + this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets)); + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java new file mode 100644 index 0000000..f051d29 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.out; + +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult; +import com.alibaba.rocketmq.common.namesrv.TopAddressing; +import com.alibaba.rocketmq.common.protocol.RequestCode; +import com.alibaba.rocketmq.common.protocol.ResponseCode; +import com.alibaba.rocketmq.common.protocol.body.*; +import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; +import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader; +import com.alibaba.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.RemotingClient; +import com.alibaba.rocketmq.remoting.exception.*; +import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; +import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; + + +/** + * @author shijia.wxr + * @author manhong.yqd + */ +public class BrokerOuterAPI { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final RemotingClient remotingClient; + private final TopAddressing topAddressing = new TopAddressing(MixAll.WS_ADDR); + private String nameSrvAddr = null; + + public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) { + this(nettyClientConfig, null); + } + + public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) { + this.remotingClient = new NettyRemotingClient(nettyClientConfig); + this.remotingClient.registerRPCHook(rpcHook); + } + + public void start() { + this.remotingClient.start(); + } + + public void shutdown() { + this.remotingClient.shutdown(); + } + + public String fetchNameServerAddr() { + try { + String addrs = this.topAddressing.fetchNSAddr(); + if (addrs != null) { + if (!addrs.equals(this.nameSrvAddr)) { + log.info("name server address changed, old: " + this.nameSrvAddr + " new: " + addrs); + this.updateNameServerAddressList(addrs); + this.nameSrvAddr = addrs; + return nameSrvAddr; + } + } + } catch (Exception e) { + log.error("fetchNameServerAddr Exception", e); + } + return nameSrvAddr; + } + + public void updateNameServerAddressList(final String addrs) { + List<String> lst = new ArrayList<String>(); + String[] addrArray = addrs.split(";"); + if (addrArray != null) { + for (String addr : addrArray) { + lst.add(addr); + } + + this.remotingClient.updateNameServerAddressList(lst); + } + } + + public RegisterBrokerResult registerBrokerAll( + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId, + final String haServerAddr, + final TopicConfigSerializeWrapper topicConfigWrapper, + final List<String> filterServerList, + final boolean oneway, + final int timeoutMills) { + RegisterBrokerResult registerBrokerResult = null; + + List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); + if (nameServerAddressList != null) { + for (String namesrvAddr : nameServerAddressList) { + try { + RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId, + haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills); + if (result != null) { + registerBrokerResult = result; + } + + log.info("register broker to name server {} OK", namesrvAddr); + } catch (Exception e) { + log.warn("registerBroker Exception, " + namesrvAddr, e); + } + } + } + + return registerBrokerResult; + } + + private RegisterBrokerResult registerBroker( + final String namesrvAddr, + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId, + final String haServerAddr, + final TopicConfigSerializeWrapper topicConfigWrapper, + final List<String> filterServerList, + final boolean oneway, + final int timeoutMills + ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + InterruptedException { + RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); + requestHeader.setBrokerAddr(brokerAddr); + requestHeader.setBrokerId(brokerId); + requestHeader.setBrokerName(brokerName); + requestHeader.setClusterName(clusterName); + requestHeader.setHaServerAddr(haServerAddr); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); + + RegisterBrokerBody requestBody = new RegisterBrokerBody(); + requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); + requestBody.setFilterServerList(filterServerList); + request.setBody(requestBody.encode()); + + if (oneway) { + try { + this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); + } catch (RemotingTooMuchRequestException e) { + } + return null; + } + + RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + RegisterBrokerResponseHeader responseHeader = + (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); + RegisterBrokerResult result = new RegisterBrokerResult(); + result.setMasterAddr(responseHeader.getMasterAddr()); + result.setHaServerAddr(responseHeader.getHaServerAddr()); + result.setHaServerAddr(responseHeader.getHaServerAddr()); + if (response.getBody() != null) { + result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); + } + return result; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public void unregisterBrokerAll( + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId + ) { + List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); + if (nameServerAddressList != null) { + for (String namesrvAddr : nameServerAddressList) { + try { + this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId); + log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr); + } catch (Exception e) { + log.warn("unregisterBroker Exception, " + namesrvAddr, e); + } + } + } + } + + public void unregisterBroker( + final String namesrvAddr, + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId + ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader(); + requestHeader.setBrokerAddr(brokerAddr); + requestHeader.setBrokerId(brokerId); + requestHeader.setBrokerName(brokerName); + requestHeader.setClusterName(clusterName); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, InterruptedException, MQBrokerException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public ConsumerOffsetSerializeWrapper getAllConsumerOffset(final String addr) throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException, MQBrokerException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null); + RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public String getAllDelayOffset(final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException, UnsupportedEncodingException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null); + RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return new String(response.getBody(), MixAll.DEFAULT_CHARSET); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr) throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException, MQBrokerException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); + RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public void registerRPCHook(RPCHook rpcHook) { + remotingClient.registerRPCHook(rpcHook); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java new file mode 100644 index 0000000..8050bc1 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.pagecache; + +import com.alibaba.rocketmq.store.GetMessageResult; +import io.netty.channel.FileRegion; +import io.netty.util.AbstractReferenceCounted; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class ManyMessageTransfer extends AbstractReferenceCounted implements FileRegion { + private final ByteBuffer byteBufferHeader; + private final GetMessageResult getMessageResult; + private long transfered; // the bytes which was transfered already + + + public ManyMessageTransfer(ByteBuffer byteBufferHeader, GetMessageResult getMessageResult) { + this.byteBufferHeader = byteBufferHeader; + this.getMessageResult = getMessageResult; + } + + + @Override + public long position() { + int pos = byteBufferHeader.position(); + List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList(); + for (ByteBuffer bb : messageBufferList) { + pos += bb.position(); + } + return pos; + } + + @Override + public long transfered() { + return transfered; + } + + @Override + public long count() { + return byteBufferHeader.limit() + this.getMessageResult.getBufferTotalSize(); + } + + @Override + public long transferTo(WritableByteChannel target, long position) throws IOException { + if (this.byteBufferHeader.hasRemaining()) { + transfered += target.write(this.byteBufferHeader); + return transfered; + } else { + List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList(); + for (ByteBuffer bb : messageBufferList) { + if (bb.hasRemaining()) { + transfered += target.write(bb); + return transfered; + } + } + } + + return 0; + } + + public void close() { + this.deallocate(); + } + + @Override + protected void deallocate() { + this.getMessageResult.release(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java new file mode 100644 index 0000000..df742c5 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.pagecache; + +import com.alibaba.rocketmq.store.SelectMappedBufferResult; +import io.netty.channel.FileRegion; +import io.netty.util.AbstractReferenceCounted; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + + +/** + * @author shijia.wxr + */ +public class OneMessageTransfer extends AbstractReferenceCounted implements FileRegion { + private final ByteBuffer byteBufferHeader; + private final SelectMappedBufferResult selectMappedBufferResult; + private long transfered; // the bytes which was transfered already + + + public OneMessageTransfer(ByteBuffer byteBufferHeader, SelectMappedBufferResult selectMappedBufferResult) { + this.byteBufferHeader = byteBufferHeader; + this.selectMappedBufferResult = selectMappedBufferResult; + } + + + @Override + public long position() { + return this.byteBufferHeader.position() + this.selectMappedBufferResult.getByteBuffer().position(); + } + + @Override + public long transfered() { + return transfered; + } + + @Override + public long count() { + return this.byteBufferHeader.limit() + this.selectMappedBufferResult.getSize(); + } + + @Override + public long transferTo(WritableByteChannel target, long position) throws IOException { + if (this.byteBufferHeader.hasRemaining()) { + transfered += target.write(this.byteBufferHeader); + return transfered; + } else if (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { + transfered += target.write(this.selectMappedBufferResult.getByteBuffer()); + return transfered; + } + + return 0; + } + + public void close() { + this.deallocate(); + } + + @Override + protected void deallocate() { + this.selectMappedBufferResult.release(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java new file mode 100644 index 0000000..cbcbc74 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.pagecache; + +import com.alibaba.rocketmq.store.QueryMessageResult; +import io.netty.channel.FileRegion; +import io.netty.util.AbstractReferenceCounted; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class QueryMessageTransfer extends AbstractReferenceCounted implements FileRegion { + private final ByteBuffer byteBufferHeader; + private final QueryMessageResult queryMessageResult; + private long transfered; // the bytes which was transfered already + + + public QueryMessageTransfer(ByteBuffer byteBufferHeader, QueryMessageResult queryMessageResult) { + this.byteBufferHeader = byteBufferHeader; + this.queryMessageResult = queryMessageResult; + } + + + @Override + public long position() { + int pos = byteBufferHeader.position(); + List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList(); + for (ByteBuffer bb : messageBufferList) { + pos += bb.position(); + } + return pos; + } + + @Override + public long transfered() { + return transfered; + } + + @Override + public long count() { + return byteBufferHeader.limit() + this.queryMessageResult.getBufferTotalSize(); + } + + @Override + public long transferTo(WritableByteChannel target, long position) throws IOException { + if (this.byteBufferHeader.hasRemaining()) { + transfered += target.write(this.byteBufferHeader); + return transfered; + } else { + List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList(); + for (ByteBuffer bb : messageBufferList) { + if (bb.hasRemaining()) { + transfered += target.write(bb); + return transfered; + } + } + } + + return 0; + } + + public void close() { + this.deallocate(); + } + + @Override + protected void deallocate() { + this.queryMessageResult.release(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java new file mode 100644 index 0000000..141ba69 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package com.alibaba.rocketmq.broker.plugin; + +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import com.alibaba.rocketmq.store.*; + +import java.util.HashMap; +import java.util.Set; + +public abstract class AbstractPluginMessageStore implements MessageStore { + protected MessageStore next = null; + protected MessageStorePluginContext context; + + public AbstractPluginMessageStore(MessageStorePluginContext context, MessageStore next) { + this.next = next; + this.context = context; + } + + @Override + public long getEarliestMessageTime() { + return next.getEarliestMessageTime(); + } + + @Override + public long lockTimeMills() { + return next.lockTimeMills(); + } + + @Override + public boolean isOSPageCacheBusy() { + return next.isOSPageCacheBusy(); + } + + @Override + public boolean isTransientStorePoolDeficient() { + return next.isTransientStorePoolDeficient(); + } + + @Override + public boolean load() { + return next.load(); + } + + @Override + public void start() throws Exception { + next.start(); + } + + @Override + public void shutdown() { + next.shutdown(); + } + + @Override + public void destroy() { + next.destroy(); + } + + @Override + public PutMessageResult putMessage(MessageExtBrokerInner msg) { + return next.putMessage(msg); + } + + @Override + public GetMessageResult getMessage(String group, String topic, int queueId, long offset, + int maxMsgNums, SubscriptionData subscriptionData) { + return next.getMessage(group, topic, queueId, offset, maxMsgNums, subscriptionData); + } + + @Override + public long getMaxOffsetInQuque(String topic, int queueId) { + return next.getMaxOffsetInQuque(topic, queueId); + } + + @Override + public long getMinOffsetInQuque(String topic, int queueId) { + return next.getMinOffsetInQuque(topic, queueId); + } + + @Override + public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) { + return next.getCommitLogOffsetInQueue(topic, queueId, cqOffset); + } + + @Override + public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { + return next.getOffsetInQueueByTime(topic, queueId, timestamp); + } + + @Override + public MessageExt lookMessageByOffset(long commitLogOffset) { + return next.lookMessageByOffset(commitLogOffset); + } + + @Override + public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) { + return next.selectOneMessageByOffset(commitLogOffset); + } + + @Override + public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) { + return next.selectOneMessageByOffset(commitLogOffset, msgSize); + } + + @Override + public String getRunningDataInfo() { + return next.getRunningDataInfo(); + } + + @Override + public HashMap<String, String> getRuntimeInfo() { + return next.getRuntimeInfo(); + } + + @Override + public long getMaxPhyOffset() { + return next.getMaxPhyOffset(); + } + + @Override + public long getMinPhyOffset() { + return next.getMinPhyOffset(); + } + + @Override + public long getEarliestMessageTime(String topic, int queueId) { + return next.getEarliestMessageTime(topic, queueId); + } + + @Override + public long getMessageStoreTimeStamp(String topic, int queueId, long offset) { + return next.getMessageStoreTimeStamp(topic, queueId, offset); + } + + @Override + public long getMessageTotalInQueue(String topic, int queueId) { + return next.getMessageTotalInQueue(topic, queueId); + } + + @Override + public SelectMappedBufferResult getCommitLogData(long offset) { + return next.getCommitLogData(offset); + } + + @Override + public boolean appendToCommitLog(long startOffset, byte[] data) { + return next.appendToCommitLog(startOffset, data); + } + + @Override + public void excuteDeleteFilesManualy() { + next.excuteDeleteFilesManualy(); + } + + @Override + public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, + long end) { + return next.queryMessage(topic, key, maxNum, begin, end); + } + + @Override + public void updateHaMasterAddress(String newAddr) { + next.updateHaMasterAddress(newAddr); + } + + @Override + public long slaveFallBehindMuch() { + return next.slaveFallBehindMuch(); + } + + @Override + public long now() { + return next.now(); + } + + @Override + public int cleanUnusedTopic(Set<String> topics) { + return next.cleanUnusedTopic(topics); + } + + @Override + public void cleanExpiredConsumerQueue() { + next.cleanExpiredConsumerQueue(); + } + + @Override + public boolean checkInDiskByConsumeOffset(String topic, int queueId, long consumeOffset) { + return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset); + } + + @Override + public long dispatchBehindBytes() { + return next.dispatchBehindBytes(); + } + + @Override + public long flush() { + return next.flush(); + } + + @Override + public boolean resetWriteOffset(long phyOffset) { + return next.resetWriteOffset(phyOffset); + } + + @Override + public long getConfirmOffset() { + return next.getConfirmOffset(); + } + + @Override + public void setConfirmOffset(long phyOffset) { + next.setConfirmOffset(phyOffset); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java new file mode 100644 index 0000000..84f5be7 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package com.alibaba.rocketmq.broker.plugin; + +import com.alibaba.rocketmq.store.MessageStore; + +import java.io.IOException; +import java.lang.reflect.Constructor; + +public final class MessageStoreFactory { + public final static MessageStore build(MessageStorePluginContext context, MessageStore messageStore) + throws IOException { + String plugin = context.getBrokerConfig().getMessageStorePlugIn(); + if (plugin != null && plugin.trim().length() != 0) { + String[] pluginClasses = plugin.split(","); + for (int i = pluginClasses.length - 1; i >= 0; --i) { + String pluginClass = pluginClasses[i]; + try { + @SuppressWarnings("unchecked") + Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass); + Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class); + AbstractPluginMessageStore pluginMessageStore = (AbstractPluginMessageStore) construct.newInstance(context, messageStore); + messageStore = pluginMessageStore; + } catch (Throwable e) { + throw new RuntimeException(String.format( + "Initialize plugin's class %s not found!", pluginClass), e); + } + } + } + return messageStore; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java new file mode 100644 index 0000000..15e8b07 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package com.alibaba.rocketmq.broker.plugin; + +import com.alibaba.rocketmq.common.BrokerConfig; +import com.alibaba.rocketmq.store.MessageArrivingListener; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; +import com.alibaba.rocketmq.store.stats.BrokerStatsManager; + +public class MessageStorePluginContext { + private MessageStoreConfig messageStoreConfig; + private BrokerStatsManager brokerStatsManager; + private MessageArrivingListener messageArrivingListener; + private BrokerConfig brokerConfig; + + public MessageStorePluginContext(MessageStoreConfig messageStoreConfig, + BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener, + BrokerConfig brokerConfig) { + super(); + this.messageStoreConfig = messageStoreConfig; + this.brokerStatsManager = brokerStatsManager; + this.messageArrivingListener = messageArrivingListener; + this.brokerConfig = brokerConfig; + } + + public MessageStoreConfig getMessageStoreConfig() { + return messageStoreConfig; + } + + public BrokerStatsManager getBrokerStatsManager() { + return brokerStatsManager; + } + + public MessageArrivingListener getMessageArrivingListener() { + return messageArrivingListener; + } + + public BrokerConfig getBrokerConfig() { + return brokerConfig; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java new file mode 100644 index 0000000..95db52d --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.processor; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext; +import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.TopicConfig; +import com.alibaba.rocketmq.common.TopicFilterType; +import com.alibaba.rocketmq.common.constant.DBMsgConstants; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.constant.PermName; +import com.alibaba.rocketmq.common.help.FAQUrl; +import com.alibaba.rocketmq.common.message.MessageAccessor; +import com.alibaba.rocketmq.common.message.MessageConst; +import com.alibaba.rocketmq.common.message.MessageDecoder; +import com.alibaba.rocketmq.common.protocol.RequestCode; +import com.alibaba.rocketmq.common.protocol.ResponseCode; +import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader; +import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; +import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader; +import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; +import com.alibaba.rocketmq.common.sysflag.TopicSysFlag; +import com.alibaba.rocketmq.common.utils.ChannelUtil; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; +import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import com.alibaba.rocketmq.store.MessageExtBrokerInner; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.Map; +import java.util.Random; + + +/** + * @author shijia.wxr + */ +public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor { + protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + protected final static int DLQ_NUMS_PER_GROUP = 1; + protected final BrokerController brokerController; + protected final Random random = new Random(System.currentTimeMillis()); + protected final SocketAddress storeHost; + private List<SendMessageHook> sendMessageHookList; + + + public AbstractSendMessageProcessor(final BrokerController brokerController) { + this.brokerController = brokerController; + this.storeHost = + new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController + .getNettyServerConfig().getListenPort()); + } + + protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx, + SendMessageRequestHeader requestHeader) { + if (!this.hasSendMessageHook()) { + return null; + } + SendMessageContext mqtraceContext; + mqtraceContext = new SendMessageContext(); + mqtraceContext.setProducerGroup(requestHeader.getProducerGroup()); + mqtraceContext.setTopic(requestHeader.getTopic()); + mqtraceContext.setMsgProps(requestHeader.getProperties()); + mqtraceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + mqtraceContext.setBrokerAddr(this.brokerController.getBrokerAddr()); + mqtraceContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId()); + mqtraceContext.setBornTimeStamp(requestHeader.getBornTimestamp()); + + Map<String, String> properties = MessageDecoder.string2messageProperties(requestHeader.getProperties()); + String uniqueKey = properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + properties.put(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); + properties.put(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); + requestHeader.setProperties(MessageDecoder.messageProperties2String(properties)); + + + if (uniqueKey == null) { + uniqueKey = ""; + } + mqtraceContext.setMsgUniqueKey(uniqueKey); + return mqtraceContext; + } + + public boolean hasSendMessageHook() { + return sendMessageHookList != null && !this.sendMessageHookList.isEmpty(); + } + + protected MessageExtBrokerInner buildInnerMsg(final ChannelHandlerContext ctx, + final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) { + int queueIdInt = requestHeader.getQueueId(); + if (queueIdInt < 0) { + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + } + int sysFlag = requestHeader.getSysFlag(); + + if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { + sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; + } + + MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); + msgInner.setTopic(requestHeader.getTopic()); + msgInner.setBody(body); + msgInner.setFlag(requestHeader.getFlag()); + MessageAccessor.setProperties(msgInner, + MessageDecoder.string2messageProperties(requestHeader.getProperties())); + msgInner.setPropertiesString(requestHeader.getProperties()); + msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), + msgInner.getTags())); + + msgInner.setQueueId(queueIdInt); + msgInner.setSysFlag(sysFlag); + msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); + msgInner.setBornHost(ctx.channel().remoteAddress()); + msgInner.setStoreHost(this.getStoreHost()); + msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader + .getReconsumeTimes()); + return msgInner; + } + + public SocketAddress getStoreHost() { + return storeHost; + } + + protected RemotingCommand msgContentCheck(final ChannelHandlerContext ctx, + final SendMessageRequestHeader requestHeader, RemotingCommand request, + final RemotingCommand response) { + if (requestHeader.getTopic().length() > Byte.MAX_VALUE) { + log.warn("putMessage message topic length too long " + requestHeader.getTopic().length()); + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + return response; + } + if (requestHeader.getProperties() != null && requestHeader.getProperties().length() > Short.MAX_VALUE) { + log.warn("putMessage message properties length too long " + + requestHeader.getProperties().length()); + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + return response; + } + if (request.getBody().length > DBMsgConstants.MAX_BODY_SIZE) { + log.warn(" topic {} msg body size {} from {}", requestHeader.getTopic(), + request.getBody().length, ChannelUtil.getRemoteIp(ctx.channel())); + response.setRemark("msg body must be less 64KB"); + response.setCode(ResponseCode.MESSAGE_ILLEGAL); + return response; + } + return response; + } + + protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, + final SendMessageRequestHeader requestHeader, final RemotingCommand response) { + if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) + && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + + "] sending message is forbidden"); + return response; + } + if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) { + String errorMsg = + "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; + log.warn(errorMsg); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(errorMsg); + return response; + } + + TopicConfig topicConfig = + this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); + if (null == topicConfig) { + int topicSysFlag = 0; + if (requestHeader.isUnitMode()) { + if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + topicSysFlag = TopicSysFlag.buildSysFlag(false, true); + } else { + topicSysFlag = TopicSysFlag.buildSysFlag(true, false); + } + } + + log.warn("the topic " + requestHeader.getTopic() + " not exist, producer: " + + ctx.channel().remoteAddress()); + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(// + requestHeader.getTopic(), // + requestHeader.getDefaultTopic(), // + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), // + requestHeader.getDefaultTopicQueueNums(), topicSysFlag); + + if (null == topicConfig) { + if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + topicConfig = + this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( + requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ, + topicSysFlag); + } + } + + if (null == topicConfig) { + response.setCode(ResponseCode.TOPIC_NOT_EXIST); + response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); + return response; + } + } + + int queueIdInt = requestHeader.getQueueId(); + int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums()); + if (queueIdInt >= idValid) { + String errorInfo = String.format("request queueId[%d] is illagal, %s Producer: %s", + queueIdInt, + topicConfig.toString(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + log.warn(errorInfo); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(errorInfo); + + return response; + } + return response; + } + + public void registerSendMessageHook(List<SendMessageHook> sendMessageHookList) { + this.sendMessageHookList = sendMessageHookList; + } + + protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request, + final RemotingCommand response) { + if (!request.isOnewayRPC()) { + try { + ctx.writeAndFlush(response); + } catch (Throwable e) { + log.error("SendMessageProcessor process request over, but response failed", e); + log.error(request.toString()); + log.error(response.toString()); + } + } + } + + public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request, + SendMessageContext context) { + if (hasSendMessageHook()) { + for (SendMessageHook hook : this.sendMessageHookList) { + try { + final SendMessageRequestHeader requestHeader = parseRequestHeader(request); + + if (null != requestHeader) { + context.setProducerGroup(requestHeader.getProducerGroup()); + context.setTopic(requestHeader.getTopic()); + context.setBodyLength(request.getBody().length); + context.setMsgProps(requestHeader.getProperties()); + context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + context.setBrokerAddr(this.brokerController.getBrokerAddr()); + context.setQueueId(requestHeader.getQueueId()); + } + + hook.sendMessageBefore(context); + requestHeader.setProperties(context.getMsgProps()); + } catch (Throwable e) { + } + } + } + } + + protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) + throws RemotingCommandException { + + SendMessageRequestHeaderV2 requestHeaderV2 = null; + SendMessageRequestHeader requestHeader = null; + switch (request.getCode()) { + case RequestCode.SEND_MESSAGE_V2: + requestHeaderV2 = + (SendMessageRequestHeaderV2) request + .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); + case RequestCode.SEND_MESSAGE: + if (null == requestHeaderV2) { + requestHeader = + (SendMessageRequestHeader) request + .decodeCommandCustomHeader(SendMessageRequestHeader.class); + } else { + requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2); + } + default: + break; + } + return requestHeader; + } + + public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) { + if (hasSendMessageHook()) { + for (SendMessageHook hook : this.sendMessageHookList) { + try { + if (response != null) { + final SendMessageResponseHeader responseHeader = + (SendMessageResponseHeader) response.readCustomHeader(); + context.setMsgId(responseHeader.getMsgId()); + context.setQueueId(responseHeader.getQueueId()); + context.setQueueOffset(responseHeader.getQueueOffset()); + context.setCode(response.getCode()); + context.setErrorMsg(response.getRemark()); + } + hook.sendMessageAfter(context); + } catch (Throwable e) { + + } + } + } + } + + @Override + public boolean rejectRequest() { + return false; + } +}
