http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java deleted file mode 100644 index 269918a..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java +++ /dev/null @@ -1,249 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.offset; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.broker.BrokerPathConfigHelper; -import com.alibaba.rocketmq.common.ConfigManager; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * @author shijia.wxr - */ -public class ConsumerOffsetManager extends ConfigManager { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private static final String TOPIC_GROUP_SEPARATOR = "@"; - - private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = - new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512); - - private transient BrokerController brokerController; - - - public ConsumerOffsetManager() { - } - - - public ConsumerOffsetManager(BrokerController brokerController) { - this.brokerController = brokerController; - } - - - public void scanUnsubscribedTopic() { - Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); - String topicAtGroup = next.getKey(); - String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays != null && arrays.length == 2) { - String topic = arrays[0]; - String group = arrays[1]; - - if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic) - && this.offsetBehindMuchThanData(topic, next.getValue())) { - it.remove(); - log.warn("remove topic offset, {}", topicAtGroup); - } - } - } - } - - - private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap<Integer, Long> table) { - Iterator<Entry<Integer, Long>> it = table.entrySet().iterator(); - boolean result = !table.isEmpty(); - - while (it.hasNext() && result) { - Entry<Integer, Long> next = it.next(); - long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey()); - long offsetInPersist = next.getValue(); - if (offsetInPersist > minOffsetInStore) { - result = false; - } else { - result = true; - } - } - - return result; - } - - - public Set<String> whichTopicByConsumer(final String group) { - Set<String> topics = new HashSet<String>(); - - Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); - String topicAtGroup = next.getKey(); - String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays != null && arrays.length == 2) { - if (group.equals(arrays[1])) { - topics.add(arrays[0]); - } - } - } - - return topics; - } - - - public Set<String> whichGroupByTopic(final String topic) { - Set<String> groups = new HashSet<String>(); - - Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); - String topicAtGroup = next.getKey(); - String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays != null && arrays.length == 2) { - if (topic.equals(arrays[0])) { - groups.add(arrays[1]); - } - } - } - - return groups; - } - - - public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) { - // topic@group - String key = topic + TOPIC_GROUP_SEPARATOR + group; - this.commitOffset(clientHost, key, queueId, offset); - } - - private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { - ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key); - if (null == map) { - map = new ConcurrentHashMap<Integer, Long>(32); - map.put(queueId, offset); - this.offsetTable.put(key, map); - } else { - Long storeOffset = map.put(queueId, offset); - if (storeOffset != null && offset < storeOffset) { - log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); - } - } - } - - public long queryOffset(final String group, final String topic, final int queueId) { - // topic@group - String key = topic + TOPIC_GROUP_SEPARATOR + group; - ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key); - if (null != map) { - Long offset = map.get(queueId); - if (offset != null) - return offset; - } - - return -1; - } - - public String encode() { - return this.encode(false); - } - - @Override - public String configFilePath() { - return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); - } - - @Override - public void decode(String jsonString) { - if (jsonString != null) { - ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class); - if (obj != null) { - this.offsetTable = obj.offsetTable; - } - } - } - - public String encode(final boolean prettyFormat) { - return RemotingSerializable.toJson(this, prettyFormat); - } - - public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() { - return offsetTable; - } - - - public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) { - this.offsetTable = offsetTable; - } - - - public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) { - - Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>(); - Set<String> topicGroups = this.offsetTable.keySet(); - if (!UtilAll.isBlank(filterGroups)) { - for (String group : filterGroups.split(",")) { - Iterator<String> it = topicGroups.iterator(); - while (it.hasNext()) { - if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) { - it.remove(); - } - } - } - } - - for (Map.Entry<String, ConcurrentHashMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) { - String topicGroup = offSetEntry.getKey(); - String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR); - if (topic.equals(topicGroupArr[0])) { - for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) { - long minOffset = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, entry.getKey()); - if (entry.getValue() >= minOffset) { - Long offset = queueMinOffset.get(entry.getKey()); - if (offset == null) { - queueMinOffset.put(entry.getKey(), Math.min(Long.MAX_VALUE, entry.getValue())); - } else { - queueMinOffset.put(entry.getKey(), Math.min(entry.getValue(), offset)); - } - } - } - } - - } - return queueMinOffset; - } - - - public Map<Integer, Long> queryOffset(final String group, final String topic) { - // topic@group - String key = topic + TOPIC_GROUP_SEPARATOR + group; - return this.offsetTable.get(key); - } - - - public void cloneOffset(final String srcGroup, final String destGroup, final String topic) { - ConcurrentHashMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup); - if (offsets != null) { - this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets)); - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java deleted file mode 100644 index f051d29..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java +++ /dev/null @@ -1,302 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.out; - -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult; -import com.alibaba.rocketmq.common.namesrv.TopAddressing; -import com.alibaba.rocketmq.common.protocol.RequestCode; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.body.*; -import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader; -import com.alibaba.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader; -import com.alibaba.rocketmq.remoting.RPCHook; -import com.alibaba.rocketmq.remoting.RemotingClient; -import com.alibaba.rocketmq.remoting.exception.*; -import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; -import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.List; - - -/** - * @author shijia.wxr - * @author manhong.yqd - */ -public class BrokerOuterAPI { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final RemotingClient remotingClient; - private final TopAddressing topAddressing = new TopAddressing(MixAll.WS_ADDR); - private String nameSrvAddr = null; - - public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) { - this(nettyClientConfig, null); - } - - public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) { - this.remotingClient = new NettyRemotingClient(nettyClientConfig); - this.remotingClient.registerRPCHook(rpcHook); - } - - public void start() { - this.remotingClient.start(); - } - - public void shutdown() { - this.remotingClient.shutdown(); - } - - public String fetchNameServerAddr() { - try { - String addrs = this.topAddressing.fetchNSAddr(); - if (addrs != null) { - if (!addrs.equals(this.nameSrvAddr)) { - log.info("name server address changed, old: " + this.nameSrvAddr + " new: " + addrs); - this.updateNameServerAddressList(addrs); - this.nameSrvAddr = addrs; - return nameSrvAddr; - } - } - } catch (Exception e) { - log.error("fetchNameServerAddr Exception", e); - } - return nameSrvAddr; - } - - public void updateNameServerAddressList(final String addrs) { - List<String> lst = new ArrayList<String>(); - String[] addrArray = addrs.split(";"); - if (addrArray != null) { - for (String addr : addrArray) { - lst.add(addr); - } - - this.remotingClient.updateNameServerAddressList(lst); - } - } - - public RegisterBrokerResult registerBrokerAll( - final String clusterName, - final String brokerAddr, - final String brokerName, - final long brokerId, - final String haServerAddr, - final TopicConfigSerializeWrapper topicConfigWrapper, - final List<String> filterServerList, - final boolean oneway, - final int timeoutMills) { - RegisterBrokerResult registerBrokerResult = null; - - List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); - if (nameServerAddressList != null) { - for (String namesrvAddr : nameServerAddressList) { - try { - RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId, - haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills); - if (result != null) { - registerBrokerResult = result; - } - - log.info("register broker to name server {} OK", namesrvAddr); - } catch (Exception e) { - log.warn("registerBroker Exception, " + namesrvAddr, e); - } - } - } - - return registerBrokerResult; - } - - private RegisterBrokerResult registerBroker( - final String namesrvAddr, - final String clusterName, - final String brokerAddr, - final String brokerName, - final long brokerId, - final String haServerAddr, - final TopicConfigSerializeWrapper topicConfigWrapper, - final List<String> filterServerList, - final boolean oneway, - final int timeoutMills - ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - InterruptedException { - RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); - requestHeader.setBrokerAddr(brokerAddr); - requestHeader.setBrokerId(brokerId); - requestHeader.setBrokerName(brokerName); - requestHeader.setClusterName(clusterName); - requestHeader.setHaServerAddr(haServerAddr); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); - - RegisterBrokerBody requestBody = new RegisterBrokerBody(); - requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); - requestBody.setFilterServerList(filterServerList); - request.setBody(requestBody.encode()); - - if (oneway) { - try { - this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); - } catch (RemotingTooMuchRequestException e) { - } - return null; - } - - RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - RegisterBrokerResponseHeader responseHeader = - (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); - RegisterBrokerResult result = new RegisterBrokerResult(); - result.setMasterAddr(responseHeader.getMasterAddr()); - result.setHaServerAddr(responseHeader.getHaServerAddr()); - result.setHaServerAddr(responseHeader.getHaServerAddr()); - if (response.getBody() != null) { - result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); - } - return result; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - public void unregisterBrokerAll( - final String clusterName, - final String brokerAddr, - final String brokerName, - final long brokerId - ) { - List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); - if (nameServerAddressList != null) { - for (String namesrvAddr : nameServerAddressList) { - try { - this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId); - log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr); - } catch (Exception e) { - log.warn("unregisterBroker Exception, " + namesrvAddr, e); - } - } - } - } - - public void unregisterBroker( - final String namesrvAddr, - final String clusterName, - final String brokerAddr, - final String brokerName, - final long brokerId - ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { - UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader(); - requestHeader.setBrokerAddr(brokerAddr); - requestHeader.setBrokerId(brokerId); - requestHeader.setBrokerName(brokerName); - requestHeader.setClusterName(clusterName); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader); - - RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return; - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, InterruptedException, MQBrokerException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); - - RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class); - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - public ConsumerOffsetSerializeWrapper getAllConsumerOffset(final String addr) throws InterruptedException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException, MQBrokerException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null); - RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class); - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - public String getAllDelayOffset(final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, - RemotingConnectException, MQBrokerException, UnsupportedEncodingException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null); - RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return new String(response.getBody(), MixAll.DEFAULT_CHARSET); - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr) throws InterruptedException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException, MQBrokerException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); - RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class); - } - default: - break; - } - - throw new MQBrokerException(response.getCode(), response.getRemark()); - } - - public void registerRPCHook(RPCHook rpcHook) { - remotingClient.registerRPCHook(rpcHook); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java b/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java deleted file mode 100644 index 8050bc1..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.pagecache; - -import com.alibaba.rocketmq.store.GetMessageResult; -import io.netty.channel.FileRegion; -import io.netty.util.AbstractReferenceCounted; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.WritableByteChannel; -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class ManyMessageTransfer extends AbstractReferenceCounted implements FileRegion { - private final ByteBuffer byteBufferHeader; - private final GetMessageResult getMessageResult; - private long transfered; // the bytes which was transfered already - - - public ManyMessageTransfer(ByteBuffer byteBufferHeader, GetMessageResult getMessageResult) { - this.byteBufferHeader = byteBufferHeader; - this.getMessageResult = getMessageResult; - } - - - @Override - public long position() { - int pos = byteBufferHeader.position(); - List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList(); - for (ByteBuffer bb : messageBufferList) { - pos += bb.position(); - } - return pos; - } - - @Override - public long transfered() { - return transfered; - } - - @Override - public long count() { - return byteBufferHeader.limit() + this.getMessageResult.getBufferTotalSize(); - } - - @Override - public long transferTo(WritableByteChannel target, long position) throws IOException { - if (this.byteBufferHeader.hasRemaining()) { - transfered += target.write(this.byteBufferHeader); - return transfered; - } else { - List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList(); - for (ByteBuffer bb : messageBufferList) { - if (bb.hasRemaining()) { - transfered += target.write(bb); - return transfered; - } - } - } - - return 0; - } - - public void close() { - this.deallocate(); - } - - @Override - protected void deallocate() { - this.getMessageResult.release(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java b/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java deleted file mode 100644 index df742c5..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.pagecache; - -import com.alibaba.rocketmq.store.SelectMappedBufferResult; -import io.netty.channel.FileRegion; -import io.netty.util.AbstractReferenceCounted; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.WritableByteChannel; - - -/** - * @author shijia.wxr - */ -public class OneMessageTransfer extends AbstractReferenceCounted implements FileRegion { - private final ByteBuffer byteBufferHeader; - private final SelectMappedBufferResult selectMappedBufferResult; - private long transfered; // the bytes which was transfered already - - - public OneMessageTransfer(ByteBuffer byteBufferHeader, SelectMappedBufferResult selectMappedBufferResult) { - this.byteBufferHeader = byteBufferHeader; - this.selectMappedBufferResult = selectMappedBufferResult; - } - - - @Override - public long position() { - return this.byteBufferHeader.position() + this.selectMappedBufferResult.getByteBuffer().position(); - } - - @Override - public long transfered() { - return transfered; - } - - @Override - public long count() { - return this.byteBufferHeader.limit() + this.selectMappedBufferResult.getSize(); - } - - @Override - public long transferTo(WritableByteChannel target, long position) throws IOException { - if (this.byteBufferHeader.hasRemaining()) { - transfered += target.write(this.byteBufferHeader); - return transfered; - } else if (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { - transfered += target.write(this.selectMappedBufferResult.getByteBuffer()); - return transfered; - } - - return 0; - } - - public void close() { - this.deallocate(); - } - - @Override - protected void deallocate() { - this.selectMappedBufferResult.release(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java b/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java deleted file mode 100644 index cbcbc74..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.pagecache; - -import com.alibaba.rocketmq.store.QueryMessageResult; -import io.netty.channel.FileRegion; -import io.netty.util.AbstractReferenceCounted; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.WritableByteChannel; -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class QueryMessageTransfer extends AbstractReferenceCounted implements FileRegion { - private final ByteBuffer byteBufferHeader; - private final QueryMessageResult queryMessageResult; - private long transfered; // the bytes which was transfered already - - - public QueryMessageTransfer(ByteBuffer byteBufferHeader, QueryMessageResult queryMessageResult) { - this.byteBufferHeader = byteBufferHeader; - this.queryMessageResult = queryMessageResult; - } - - - @Override - public long position() { - int pos = byteBufferHeader.position(); - List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList(); - for (ByteBuffer bb : messageBufferList) { - pos += bb.position(); - } - return pos; - } - - @Override - public long transfered() { - return transfered; - } - - @Override - public long count() { - return byteBufferHeader.limit() + this.queryMessageResult.getBufferTotalSize(); - } - - @Override - public long transferTo(WritableByteChannel target, long position) throws IOException { - if (this.byteBufferHeader.hasRemaining()) { - transfered += target.write(this.byteBufferHeader); - return transfered; - } else { - List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList(); - for (ByteBuffer bb : messageBufferList) { - if (bb.hasRemaining()) { - transfered += target.write(bb); - return transfered; - } - } - } - - return 0; - } - - public void close() { - this.deallocate(); - } - - @Override - protected void deallocate() { - this.queryMessageResult.release(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java deleted file mode 100644 index 141ba69..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ /dev/null @@ -1,236 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package com.alibaba.rocketmq.broker.plugin; - -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; -import com.alibaba.rocketmq.store.*; - -import java.util.HashMap; -import java.util.Set; - -public abstract class AbstractPluginMessageStore implements MessageStore { - protected MessageStore next = null; - protected MessageStorePluginContext context; - - public AbstractPluginMessageStore(MessageStorePluginContext context, MessageStore next) { - this.next = next; - this.context = context; - } - - @Override - public long getEarliestMessageTime() { - return next.getEarliestMessageTime(); - } - - @Override - public long lockTimeMills() { - return next.lockTimeMills(); - } - - @Override - public boolean isOSPageCacheBusy() { - return next.isOSPageCacheBusy(); - } - - @Override - public boolean isTransientStorePoolDeficient() { - return next.isTransientStorePoolDeficient(); - } - - @Override - public boolean load() { - return next.load(); - } - - @Override - public void start() throws Exception { - next.start(); - } - - @Override - public void shutdown() { - next.shutdown(); - } - - @Override - public void destroy() { - next.destroy(); - } - - @Override - public PutMessageResult putMessage(MessageExtBrokerInner msg) { - return next.putMessage(msg); - } - - @Override - public GetMessageResult getMessage(String group, String topic, int queueId, long offset, - int maxMsgNums, SubscriptionData subscriptionData) { - return next.getMessage(group, topic, queueId, offset, maxMsgNums, subscriptionData); - } - - @Override - public long getMaxOffsetInQuque(String topic, int queueId) { - return next.getMaxOffsetInQuque(topic, queueId); - } - - @Override - public long getMinOffsetInQuque(String topic, int queueId) { - return next.getMinOffsetInQuque(topic, queueId); - } - - @Override - public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) { - return next.getCommitLogOffsetInQueue(topic, queueId, cqOffset); - } - - @Override - public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { - return next.getOffsetInQueueByTime(topic, queueId, timestamp); - } - - @Override - public MessageExt lookMessageByOffset(long commitLogOffset) { - return next.lookMessageByOffset(commitLogOffset); - } - - @Override - public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) { - return next.selectOneMessageByOffset(commitLogOffset); - } - - @Override - public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) { - return next.selectOneMessageByOffset(commitLogOffset, msgSize); - } - - @Override - public String getRunningDataInfo() { - return next.getRunningDataInfo(); - } - - @Override - public HashMap<String, String> getRuntimeInfo() { - return next.getRuntimeInfo(); - } - - @Override - public long getMaxPhyOffset() { - return next.getMaxPhyOffset(); - } - - @Override - public long getMinPhyOffset() { - return next.getMinPhyOffset(); - } - - @Override - public long getEarliestMessageTime(String topic, int queueId) { - return next.getEarliestMessageTime(topic, queueId); - } - - @Override - public long getMessageStoreTimeStamp(String topic, int queueId, long offset) { - return next.getMessageStoreTimeStamp(topic, queueId, offset); - } - - @Override - public long getMessageTotalInQueue(String topic, int queueId) { - return next.getMessageTotalInQueue(topic, queueId); - } - - @Override - public SelectMappedBufferResult getCommitLogData(long offset) { - return next.getCommitLogData(offset); - } - - @Override - public boolean appendToCommitLog(long startOffset, byte[] data) { - return next.appendToCommitLog(startOffset, data); - } - - @Override - public void excuteDeleteFilesManualy() { - next.excuteDeleteFilesManualy(); - } - - @Override - public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, - long end) { - return next.queryMessage(topic, key, maxNum, begin, end); - } - - @Override - public void updateHaMasterAddress(String newAddr) { - next.updateHaMasterAddress(newAddr); - } - - @Override - public long slaveFallBehindMuch() { - return next.slaveFallBehindMuch(); - } - - @Override - public long now() { - return next.now(); - } - - @Override - public int cleanUnusedTopic(Set<String> topics) { - return next.cleanUnusedTopic(topics); - } - - @Override - public void cleanExpiredConsumerQueue() { - next.cleanExpiredConsumerQueue(); - } - - @Override - public boolean checkInDiskByConsumeOffset(String topic, int queueId, long consumeOffset) { - return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset); - } - - @Override - public long dispatchBehindBytes() { - return next.dispatchBehindBytes(); - } - - @Override - public long flush() { - return next.flush(); - } - - @Override - public boolean resetWriteOffset(long phyOffset) { - return next.resetWriteOffset(phyOffset); - } - - @Override - public long getConfirmOffset() { - return next.getConfirmOffset(); - } - - @Override - public void setConfirmOffset(long phyOffset) { - next.setConfirmOffset(phyOffset); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java b/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java deleted file mode 100644 index 84f5be7..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package com.alibaba.rocketmq.broker.plugin; - -import com.alibaba.rocketmq.store.MessageStore; - -import java.io.IOException; -import java.lang.reflect.Constructor; - -public final class MessageStoreFactory { - public final static MessageStore build(MessageStorePluginContext context, MessageStore messageStore) - throws IOException { - String plugin = context.getBrokerConfig().getMessageStorePlugIn(); - if (plugin != null && plugin.trim().length() != 0) { - String[] pluginClasses = plugin.split(","); - for (int i = pluginClasses.length - 1; i >= 0; --i) { - String pluginClass = pluginClasses[i]; - try { - @SuppressWarnings("unchecked") - Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass); - Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class); - AbstractPluginMessageStore pluginMessageStore = (AbstractPluginMessageStore) construct.newInstance(context, messageStore); - messageStore = pluginMessageStore; - } catch (Throwable e) { - throw new RuntimeException(String.format( - "Initialize plugin's class %s not found!", pluginClass), e); - } - } - } - return messageStore; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java b/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java deleted file mode 100644 index 15e8b07..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package com.alibaba.rocketmq.broker.plugin; - -import com.alibaba.rocketmq.common.BrokerConfig; -import com.alibaba.rocketmq.store.MessageArrivingListener; -import com.alibaba.rocketmq.store.config.MessageStoreConfig; -import com.alibaba.rocketmq.store.stats.BrokerStatsManager; - -public class MessageStorePluginContext { - private MessageStoreConfig messageStoreConfig; - private BrokerStatsManager brokerStatsManager; - private MessageArrivingListener messageArrivingListener; - private BrokerConfig brokerConfig; - - public MessageStorePluginContext(MessageStoreConfig messageStoreConfig, - BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener, - BrokerConfig brokerConfig) { - super(); - this.messageStoreConfig = messageStoreConfig; - this.brokerStatsManager = brokerStatsManager; - this.messageArrivingListener = messageArrivingListener; - this.brokerConfig = brokerConfig; - } - - public MessageStoreConfig getMessageStoreConfig() { - return messageStoreConfig; - } - - public BrokerStatsManager getBrokerStatsManager() { - return brokerStatsManager; - } - - public MessageArrivingListener getMessageArrivingListener() { - return messageArrivingListener; - } - - public BrokerConfig getBrokerConfig() { - return brokerConfig; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java deleted file mode 100644 index 95db52d..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ /dev/null @@ -1,332 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.processor; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext; -import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.TopicConfig; -import com.alibaba.rocketmq.common.TopicFilterType; -import com.alibaba.rocketmq.common.constant.DBMsgConstants; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.constant.PermName; -import com.alibaba.rocketmq.common.help.FAQUrl; -import com.alibaba.rocketmq.common.message.MessageAccessor; -import com.alibaba.rocketmq.common.message.MessageConst; -import com.alibaba.rocketmq.common.message.MessageDecoder; -import com.alibaba.rocketmq.common.protocol.RequestCode; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; -import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader; -import com.alibaba.rocketmq.common.sysflag.MessageSysFlag; -import com.alibaba.rocketmq.common.sysflag.TopicSysFlag; -import com.alibaba.rocketmq.common.utils.ChannelUtil; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; -import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import com.alibaba.rocketmq.store.MessageExtBrokerInner; -import io.netty.channel.ChannelHandlerContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.List; -import java.util.Map; -import java.util.Random; - - -/** - * @author shijia.wxr - */ -public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor { - protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - - protected final static int DLQ_NUMS_PER_GROUP = 1; - protected final BrokerController brokerController; - protected final Random random = new Random(System.currentTimeMillis()); - protected final SocketAddress storeHost; - private List<SendMessageHook> sendMessageHookList; - - - public AbstractSendMessageProcessor(final BrokerController brokerController) { - this.brokerController = brokerController; - this.storeHost = - new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController - .getNettyServerConfig().getListenPort()); - } - - protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx, - SendMessageRequestHeader requestHeader) { - if (!this.hasSendMessageHook()) { - return null; - } - SendMessageContext mqtraceContext; - mqtraceContext = new SendMessageContext(); - mqtraceContext.setProducerGroup(requestHeader.getProducerGroup()); - mqtraceContext.setTopic(requestHeader.getTopic()); - mqtraceContext.setMsgProps(requestHeader.getProperties()); - mqtraceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - mqtraceContext.setBrokerAddr(this.brokerController.getBrokerAddr()); - mqtraceContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId()); - mqtraceContext.setBornTimeStamp(requestHeader.getBornTimestamp()); - - Map<String, String> properties = MessageDecoder.string2messageProperties(requestHeader.getProperties()); - String uniqueKey = properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); - properties.put(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); - properties.put(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); - requestHeader.setProperties(MessageDecoder.messageProperties2String(properties)); - - - if (uniqueKey == null) { - uniqueKey = ""; - } - mqtraceContext.setMsgUniqueKey(uniqueKey); - return mqtraceContext; - } - - public boolean hasSendMessageHook() { - return sendMessageHookList != null && !this.sendMessageHookList.isEmpty(); - } - - protected MessageExtBrokerInner buildInnerMsg(final ChannelHandlerContext ctx, - final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) { - int queueIdInt = requestHeader.getQueueId(); - if (queueIdInt < 0) { - queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); - } - int sysFlag = requestHeader.getSysFlag(); - - if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { - sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; - } - - MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); - msgInner.setTopic(requestHeader.getTopic()); - msgInner.setBody(body); - msgInner.setFlag(requestHeader.getFlag()); - MessageAccessor.setProperties(msgInner, - MessageDecoder.string2messageProperties(requestHeader.getProperties())); - msgInner.setPropertiesString(requestHeader.getProperties()); - msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), - msgInner.getTags())); - - msgInner.setQueueId(queueIdInt); - msgInner.setSysFlag(sysFlag); - msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); - msgInner.setBornHost(ctx.channel().remoteAddress()); - msgInner.setStoreHost(this.getStoreHost()); - msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader - .getReconsumeTimes()); - return msgInner; - } - - public SocketAddress getStoreHost() { - return storeHost; - } - - protected RemotingCommand msgContentCheck(final ChannelHandlerContext ctx, - final SendMessageRequestHeader requestHeader, RemotingCommand request, - final RemotingCommand response) { - if (requestHeader.getTopic().length() > Byte.MAX_VALUE) { - log.warn("putMessage message topic length too long " + requestHeader.getTopic().length()); - response.setCode(ResponseCode.MESSAGE_ILLEGAL); - return response; - } - if (requestHeader.getProperties() != null && requestHeader.getProperties().length() > Short.MAX_VALUE) { - log.warn("putMessage message properties length too long " - + requestHeader.getProperties().length()); - response.setCode(ResponseCode.MESSAGE_ILLEGAL); - return response; - } - if (request.getBody().length > DBMsgConstants.MAX_BODY_SIZE) { - log.warn(" topic {} msg body size {} from {}", requestHeader.getTopic(), - request.getBody().length, ChannelUtil.getRemoteIp(ctx.channel())); - response.setRemark("msg body must be less 64KB"); - response.setCode(ResponseCode.MESSAGE_ILLEGAL); - return response; - } - return response; - } - - protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, - final SendMessageRequestHeader requestHeader, final RemotingCommand response) { - if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) - && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) { - response.setCode(ResponseCode.NO_PERMISSION); - response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() - + "] sending message is forbidden"); - return response; - } - if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) { - String errorMsg = - "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; - log.warn(errorMsg); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(errorMsg); - return response; - } - - TopicConfig topicConfig = - this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); - if (null == topicConfig) { - int topicSysFlag = 0; - if (requestHeader.isUnitMode()) { - if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - topicSysFlag = TopicSysFlag.buildSysFlag(false, true); - } else { - topicSysFlag = TopicSysFlag.buildSysFlag(true, false); - } - } - - log.warn("the topic " + requestHeader.getTopic() + " not exist, producer: " - + ctx.channel().remoteAddress()); - topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(// - requestHeader.getTopic(), // - requestHeader.getDefaultTopic(), // - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), // - requestHeader.getDefaultTopicQueueNums(), topicSysFlag); - - if (null == topicConfig) { - if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - topicConfig = - this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( - requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ, - topicSysFlag); - } - } - - if (null == topicConfig) { - response.setCode(ResponseCode.TOPIC_NOT_EXIST); - response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!" - + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); - return response; - } - } - - int queueIdInt = requestHeader.getQueueId(); - int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums()); - if (queueIdInt >= idValid) { - String errorInfo = String.format("request queueId[%d] is illagal, %s Producer: %s", - queueIdInt, - topicConfig.toString(), - RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - - log.warn(errorInfo); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(errorInfo); - - return response; - } - return response; - } - - public void registerSendMessageHook(List<SendMessageHook> sendMessageHookList) { - this.sendMessageHookList = sendMessageHookList; - } - - protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request, - final RemotingCommand response) { - if (!request.isOnewayRPC()) { - try { - ctx.writeAndFlush(response); - } catch (Throwable e) { - log.error("SendMessageProcessor process request over, but response failed", e); - log.error(request.toString()); - log.error(response.toString()); - } - } - } - - public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request, - SendMessageContext context) { - if (hasSendMessageHook()) { - for (SendMessageHook hook : this.sendMessageHookList) { - try { - final SendMessageRequestHeader requestHeader = parseRequestHeader(request); - - if (null != requestHeader) { - context.setProducerGroup(requestHeader.getProducerGroup()); - context.setTopic(requestHeader.getTopic()); - context.setBodyLength(request.getBody().length); - context.setMsgProps(requestHeader.getProperties()); - context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - context.setBrokerAddr(this.brokerController.getBrokerAddr()); - context.setQueueId(requestHeader.getQueueId()); - } - - hook.sendMessageBefore(context); - requestHeader.setProperties(context.getMsgProps()); - } catch (Throwable e) { - } - } - } - } - - protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) - throws RemotingCommandException { - - SendMessageRequestHeaderV2 requestHeaderV2 = null; - SendMessageRequestHeader requestHeader = null; - switch (request.getCode()) { - case RequestCode.SEND_MESSAGE_V2: - requestHeaderV2 = - (SendMessageRequestHeaderV2) request - .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); - case RequestCode.SEND_MESSAGE: - if (null == requestHeaderV2) { - requestHeader = - (SendMessageRequestHeader) request - .decodeCommandCustomHeader(SendMessageRequestHeader.class); - } else { - requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2); - } - default: - break; - } - return requestHeader; - } - - public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) { - if (hasSendMessageHook()) { - for (SendMessageHook hook : this.sendMessageHookList) { - try { - if (response != null) { - final SendMessageResponseHeader responseHeader = - (SendMessageResponseHeader) response.readCustomHeader(); - context.setMsgId(responseHeader.getMsgId()); - context.setQueueId(responseHeader.getQueueId()); - context.setQueueOffset(responseHeader.getQueueOffset()); - context.setCode(response.getCode()); - context.setErrorMsg(response.getRemark()); - } - hook.sendMessageAfter(context); - } catch (Throwable e) { - - } - } - } - } - - @Override - public boolean rejectRequest() { - return false; - } -}
