http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java new file mode 100644 index 0000000..74e7ea7 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.client; + +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; +import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * @author shijia.wxr + */ +public class ProducerManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final long LOCK_TIMEOUT_MILLIS = 3000; + private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; + private final Lock groupChannelLock = new ReentrantLock(); + private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable = + new HashMap<String, HashMap<Channel, ClientChannelInfo>>(); + + + public ProducerManager() { + } + + + public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() { + HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable = + new HashMap<String, HashMap<Channel, ClientChannelInfo>>(); + try { + if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + newGroupChannelTable.putAll(groupChannelTable); + } finally { + groupChannelLock.unlock(); + } + } + } catch (InterruptedException e) { + log.error("", e); + } + return newGroupChannelTable; + } + + + public void scanNotActiveChannel() { + try { + if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable + .entrySet()) { + final String group = entry.getKey(); + final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue(); + + Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator(); + while (it.hasNext()) { + Entry<Channel, ClientChannelInfo> item = it.next(); + // final Integer id = item.getKey(); + final ClientChannelInfo info = item.getValue(); + + long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp(); + if (diff > CHANNEL_EXPIRED_TIMEOUT) { + it.remove(); + log.warn( + "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", + RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group); + RemotingUtil.closeChannel(info.getChannel()); + } + } + } + } finally { + this.groupChannelLock.unlock(); + } + } else { + log.warn("ProducerManager scanNotActiveChannel lock timeout"); + } + } catch (InterruptedException e) { + log.error("", e); + } + } + + + public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { + if (channel != null) { + try { + if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable + .entrySet()) { + final String group = entry.getKey(); + final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable = + entry.getValue(); + final ClientChannelInfo clientChannelInfo = + clientChannelInfoTable.remove(channel); + if (clientChannelInfo != null) { + log.info( + "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}", + clientChannelInfo.toString(), remoteAddr, group); + } + + } + } finally { + this.groupChannelLock.unlock(); + } + } else { + log.warn("ProducerManager doChannelCloseEvent lock timeout"); + } + } catch (InterruptedException e) { + log.error("", e); + } + } + } + + + public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) { + try { + ClientChannelInfo clientChannelInfoFound = null; + + if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); + if (null == channelTable) { + channelTable = new HashMap<Channel, ClientChannelInfo>(); + this.groupChannelTable.put(group, channelTable); + } + + clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel()); + if (null == clientChannelInfoFound) { + channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo); + log.info("new producer connected, group: {} channel: {}", group, + clientChannelInfo.toString()); + } + } finally { + this.groupChannelLock.unlock(); + } + + if (clientChannelInfoFound != null) { + clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis()); + } + } else { + log.warn("ProducerManager registerProducer lock timeout"); + } + } catch (InterruptedException e) { + log.error("", e); + } + } + + + public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) { + try { + if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); + if (null != channelTable && !channelTable.isEmpty()) { + ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel()); + if (old != null) { + log.info("unregister a producer[{}] from groupChannelTable {}", group, + clientChannelInfo.toString()); + } + + if (channelTable.isEmpty()) { + this.groupChannelTable.remove(group); + log.info("unregister a producer group[{}] from groupChannelTable", group); + } + } + } finally { + this.groupChannelLock.unlock(); + } + } else { + log.warn("ProducerManager unregisterProducer lock timeout"); + } + } catch (InterruptedException e) { + log.error("", e); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java new file mode 100644 index 0000000..a38c9cb --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.client.net; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.broker.client.ClientChannelInfo; +import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo; +import com.alibaba.rocketmq.broker.pagecache.OneMessageTransfer; +import com.alibaba.rocketmq.common.MQVersion; +import com.alibaba.rocketmq.common.TopicConfig; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.message.MessageQueueForC; +import com.alibaba.rocketmq.common.protocol.RequestCode; +import com.alibaba.rocketmq.common.protocol.ResponseCode; +import com.alibaba.rocketmq.common.protocol.body.GetConsumerStatusBody; +import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBody; +import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBodyForC; +import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; +import com.alibaba.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; +import com.alibaba.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader; +import com.alibaba.rocketmq.common.protocol.header.ResetOffsetRequestHeader; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; +import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import com.alibaba.rocketmq.store.SelectMappedBufferResult; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.FileRegion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class Broker2Client { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final BrokerController brokerController; + + public Broker2Client(BrokerController brokerController) { + this.brokerController = brokerController; + } + + public void checkProducerTransactionState( + final Channel channel, + final CheckTransactionStateRequestHeader requestHeader, + final SelectMappedBufferResult selectMappedBufferResult) { + RemotingCommand request = + RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader); + request.markOnewayRPC(); + + try { + FileRegion fileRegion = + new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()), + selectMappedBufferResult); + channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + selectMappedBufferResult.release(); + if (!future.isSuccess()) { + log.error("invokeProducer failed,", future.cause()); + } + } + }); + } catch (Throwable e) { + log.error("invokeProducer exception", e); + selectMappedBufferResult.release(); + } + } + + public RemotingCommand callClient(final Channel channel, + final RemotingCommand request + ) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException { + return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000); + } + + public void notifyConsumerIdsChanged( + final Channel channel, + final String consumerGroup) { + if (null == consumerGroup) { + log.error("notifyConsumerIdsChanged consumerGroup is null"); + return; + } + + NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader(); + requestHeader.setConsumerGroup(consumerGroup); + RemotingCommand request = + RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); + + try { + this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); + } catch (Exception e) { + log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage()); + } + } + + public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce) { + return resetOffset(topic, group, timeStamp, isForce, false); + } + + public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce, + boolean isC) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); + if (null == topicConfig) { + log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic); + return response; + } + + Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(); + + for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { + MessageQueue mq = new MessageQueue(); + mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); + mq.setTopic(topic); + mq.setQueueId(i); + + long consumerOffset = + this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i); + if (-1 == consumerOffset) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("THe consumer group <%s> not exist", group)); + return response; + } + + long timeStampOffset; + if (timeStamp == -1) { + + timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); + } else { + timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp); + } + + if (timeStampOffset < 0) { + log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset); + timeStampOffset = 0; + } + + if (isForce || timeStampOffset < consumerOffset) { + offsetTable.put(mq, timeStampOffset); + } else { + offsetTable.put(mq, consumerOffset); + } + } + + ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setGroup(group); + requestHeader.setTimestamp(timeStamp); + RemotingCommand request = + RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader); + if (isC) { + // c++ language + ResetOffsetBodyForC body = new ResetOffsetBodyForC(); + List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable); + body.setOffsetTable(offsetList); + request.setBody(body.encode()); + } else { + // other language + ResetOffsetBody body = new ResetOffsetBody(); + body.setOffsetTable(offsetTable); + request.setBody(body.encode()); + } + + ConsumerGroupInfo consumerGroupInfo = + this.brokerController.getConsumerManager().getConsumerGroupInfo(group); + + if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) { + ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = + consumerGroupInfo.getChannelInfoTable(); + for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) { + int version = entry.getValue().getVersion(); + if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) { + try { + this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000); + log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}", + new Object[]{topic, group, entry.getValue().getClientId()}); + } catch (Exception e) { + log.error("[reset-offset] reset offset exception. topic={}, group={}", + new Object[]{topic, group}, e); + } + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("the client does not support this feature. version=" + + MQVersion.getVersionDesc(version)); + log.warn("[reset-offset] the client does not support this feature. version={}", + RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); + return response; + } + } + } else { + String errorInfo = + String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d", + requestHeader.getGroup(), + requestHeader.getTopic(), + requestHeader.getTimestamp()); + log.error(errorInfo); + response.setCode(ResponseCode.CONSUMER_NOT_ONLINE); + response.setRemark(errorInfo); + return response; + } + response.setCode(ResponseCode.SUCCESS); + ResetOffsetBody resBody = new ResetOffsetBody(); + resBody.setOffsetTable(offsetTable); + response.setBody(resBody.encode()); + return response; + } + + private List<MessageQueueForC> convertOffsetTable2OffsetList(Map<MessageQueue, Long> table) { + List<MessageQueueForC> list = new ArrayList<MessageQueueForC>(); + for (Entry<MessageQueue, Long> entry : table.entrySet()) { + MessageQueue mq = entry.getKey(); + MessageQueueForC tmp = + new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue()); + list.add(tmp); + } + return list; + } + + public RemotingCommand getConsumeStatus(String topic, String group, String originClientId) { + final RemotingCommand result = RemotingCommand.createResponseCommand(null); + + GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setGroup(group); + RemotingCommand request = + RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, + requestHeader); + + Map<String, Map<MessageQueue, Long>> consumerStatusTable = + new HashMap<String, Map<MessageQueue, Long>>(); + ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = + this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable(); + if (null == channelInfoTable || channelInfoTable.isEmpty()) { + result.setCode(ResponseCode.SYSTEM_ERROR); + result.setRemark(String.format("No Any Consumer online in the consumer group: [%s]", group)); + return result; + } + + for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) { + int version = entry.getValue().getVersion(); + String clientId = entry.getValue().getClientId(); + if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) { + result.setCode(ResponseCode.SYSTEM_ERROR); + result.setRemark("the client does not support this feature. version=" + + MQVersion.getVersionDesc(version)); + log.warn("[get-consumer-status] the client does not support this feature. version={}", + RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); + return result; + } else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) { + try { + RemotingCommand response = + this.brokerController.getRemotingServer().invokeSync(entry.getKey(), request, 5000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + if (response.getBody() != null) { + GetConsumerStatusBody body = + GetConsumerStatusBody.decode(response.getBody(), + GetConsumerStatusBody.class); + + consumerStatusTable.put(clientId, body.getMessageQueueTable()); + log.info( + "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}", + new Object[]{topic, group, clientId}); + } + } + default: + break; + } + } catch (Exception e) { + log.error( + "[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}", + new Object[]{topic, group}, e); + } + + if (!UtilAll.isBlank(originClientId) && originClientId.equals(clientId)) { + break; + } + } + } + + result.setCode(ResponseCode.SUCCESS); + GetConsumerStatusBody resBody = new GetConsumerStatusBody(); + resBody.setConsumerTable(consumerStatusTable); + result.setBody(resBody.encode()); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java new file mode 100644 index 0000000..84be628 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.client.rebalance; + +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * @author shijia.wxr + */ +public class RebalanceLockManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME); + private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty( + "rocketmq.broker.rebalance.lockMaxLiveTime", "60000")); + private final Lock lock = new ReentrantLock(); + private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable = + new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024); + + public boolean tryLock(final String group, final MessageQueue mq, final String clientId) { + + if (!this.isLocked(group, mq, clientId)) { + try { + this.lock.lockInterruptibly(); + try { + ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); + if (null == groupValue) { + groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32); + this.mqLockTable.put(group, groupValue); + } + + LockEntry lockEntry = groupValue.get(mq); + if (null == lockEntry) { + lockEntry = new LockEntry(); + lockEntry.setClientId(clientId); + groupValue.put(mq, lockEntry); + log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", // + group, // + clientId, // + mq); + } + + if (lockEntry.isLocked(clientId)) { + lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); + return true; + } + + String oldClientId = lockEntry.getClientId(); + + + if (lockEntry.isExpired()) { + lockEntry.setClientId(clientId); + lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); + log.warn( + "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", // + group, // + oldClientId, // + clientId, // + mq); + return true; + } + + + log.warn( + "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", // + group, // + oldClientId, // + clientId, // + mq); + return false; + } finally { + this.lock.unlock(); + } + } catch (InterruptedException e) { + log.error("putMessage exception", e); + } + } else { + + } + + return true; + } + + private boolean isLocked(final String group, final MessageQueue mq, final String clientId) { + ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); + if (groupValue != null) { + LockEntry lockEntry = groupValue.get(mq); + if (lockEntry != null) { + boolean locked = lockEntry.isLocked(clientId); + if (locked) { + lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); + } + + return locked; + } + } + + return false; + } + + public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs, + final String clientId) { + Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size()); + Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size()); + + + for (MessageQueue mq : mqs) { + if (this.isLocked(group, mq, clientId)) { + lockedMqs.add(mq); + } else { + notLockedMqs.add(mq); + } + } + + if (!notLockedMqs.isEmpty()) { + try { + this.lock.lockInterruptibly(); + try { + ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); + if (null == groupValue) { + groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32); + this.mqLockTable.put(group, groupValue); + } + + + for (MessageQueue mq : notLockedMqs) { + LockEntry lockEntry = groupValue.get(mq); + if (null == lockEntry) { + lockEntry = new LockEntry(); + lockEntry.setClientId(clientId); + groupValue.put(mq, lockEntry); + log.info( + "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", // + group, // + clientId, // + mq); + } + + + if (lockEntry.isLocked(clientId)) { + lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); + lockedMqs.add(mq); + continue; + } + + String oldClientId = lockEntry.getClientId(); + + + if (lockEntry.isExpired()) { + lockEntry.setClientId(clientId); + lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); + log.warn( + "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", // + group, // + oldClientId, // + clientId, // + mq); + lockedMqs.add(mq); + continue; + } + + + log.warn( + "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", // + group, // + oldClientId, // + clientId, // + mq); + } + } finally { + this.lock.unlock(); + } + } catch (InterruptedException e) { + log.error("putMessage exception", e); + } + } + + return lockedMqs; + } + + public void unlockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) { + try { + this.lock.lockInterruptibly(); + try { + ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); + if (null != groupValue) { + for (MessageQueue mq : mqs) { + LockEntry lockEntry = groupValue.get(mq); + if (null != lockEntry) { + if (lockEntry.getClientId().equals(clientId)) { + groupValue.remove(mq); + log.info("unlockBatch, Group: {} {} {}", + group, + mq, + clientId); + } else { + log.warn("unlockBatch, but mq locked by other client: {}, Group: {} {} {}", + lockEntry.getClientId(), + group, + mq, + clientId); + } + } else { + log.warn("unlockBatch, but mq not locked, Group: {} {} {}", + group, + mq, + clientId); + } + } + } else { + log.warn("unlockBatch, group not exist, Group: {} {}", + group, + clientId); + } + } finally { + this.lock.unlock(); + } + } catch (InterruptedException e) { + log.error("putMessage exception", e); + } + } + + static class LockEntry { + private String clientId; + private volatile long lastUpdateTimestamp = System.currentTimeMillis(); + + + public String getClientId() { + return clientId; + } + + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } + + public boolean isLocked(final String clientId) { + boolean eq = this.clientId.equals(clientId); + return eq && !this.isExpired(); + } + + public boolean isExpired() { + boolean expired = + (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; + + return expired; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java new file mode 100644 index 0000000..b2e7e82 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.broker.filtersrv; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.broker.BrokerStartup; +import com.alibaba.rocketmq.common.ThreadFactoryImpl; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.remoting.common.RemotingUtil; +import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +public class FilterServerManager { + + public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000; + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final ConcurrentHashMap<Channel, FilterServerInfo> filterServerTable = + new ConcurrentHashMap<Channel, FilterServerInfo>(16); + private final BrokerController brokerController; + + private ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread")); + + public FilterServerManager(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + public void start() { + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + FilterServerManager.this.createFilterServer(); + } catch (Exception e) { + log.error("", e); + } + } + }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS); + } + + public void createFilterServer() { + int more = + this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size(); + String cmd = this.buildStartCommand(); + for (int i = 0; i < more; i++) { + FilterServerUtil.callShell(cmd, log); + } + } + + private String buildStartCommand() { + String config = ""; + if (BrokerStartup.configFile != null) { + config = String.format("-c %s", BrokerStartup.configFile); + } + + if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) { + config += String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr()); + } + + if (RemotingUtil.isWindowsPlatform()) { + return String.format("start /b %s\\bin\\mqfiltersrv.exe %s", + this.brokerController.getBrokerConfig().getRocketmqHome(), + config); + } else { + return String.format("sh %s/bin/startfsrv.sh %s", + this.brokerController.getBrokerConfig().getRocketmqHome(), + config); + } + } + + public void shutdown() { + this.scheduledExecutorService.shutdown(); + } + + public void registerFilterServer(final Channel channel, final String filterServerAddr) { + FilterServerInfo filterServerInfo = this.filterServerTable.get(channel); + if (filterServerInfo != null) { + filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis()); + } else { + filterServerInfo = new FilterServerInfo(); + filterServerInfo.setFilterServerAddr(filterServerAddr); + filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis()); + this.filterServerTable.put(channel, filterServerInfo); + log.info("Receive a New Filter Server<{}>", filterServerAddr); + } + } + + /** + + */ + public void scanNotActiveChannel() { + + Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<Channel, FilterServerInfo> next = it.next(); + long timestamp = next.getValue().getLastUpdateTimestamp(); + Channel channel = next.getKey(); + if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) { + log.info("The Filter Server<{}> expired, remove it", next.getKey()); + it.remove(); + RemotingUtil.closeChannel(channel); + } + } + } + + public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { + FilterServerInfo old = this.filterServerTable.remove(channel); + if (old != null) { + log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(), + remoteAddr); + } + } + + public List<String> buildNewFilterServerList() { + List<String> addr = new ArrayList<String>(); + Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<Channel, FilterServerInfo> next = it.next(); + addr.add(next.getValue().getFilterServerAddr()); + } + return addr; + } + + static class FilterServerInfo { + private String filterServerAddr; + private long lastUpdateTimestamp; + + + public String getFilterServerAddr() { + return filterServerAddr; + } + + + public void setFilterServerAddr(String filterServerAddr) { + this.filterServerAddr = filterServerAddr; + } + + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java new file mode 100644 index 0000000..c5ace19 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.broker.filtersrv; + +import org.slf4j.Logger; + + +public class FilterServerUtil { + public static void callShell(final String shellString, final Logger log) { + Process process = null; + try { + String[] cmdArray = splitShellString(shellString); + process = Runtime.getRuntime().exec(cmdArray); + process.waitFor(); + log.info("callShell: <{}> OK", shellString); + } catch (Throwable e) { + log.error("callShell: readLine IOException, " + shellString, e); + } finally { + if (null != process) + process.destroy(); + } + } + + private static String[] splitShellString(final String shellString) { + String[] split = shellString.split(" "); + return split; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java new file mode 100644 index 0000000..586bed0 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.latency; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.common.ThreadFactoryImpl; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.remoting.netty.RequestTask; +import com.alibaba.rocketmq.remoting.protocol.RemotingSysResponseCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +/** + * @author shijia.wxr + */ +public class BrokerFastFailure { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "BrokerFastFailureScheduledThread")); + private final BrokerController brokerController; + + public BrokerFastFailure(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + public void start() { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + cleanExpiredRequest(); + } + }, 1000, 10, TimeUnit.MILLISECONDS); + } + + private void cleanExpiredRequest() { + while (this.brokerController.getMessageStore().isOSPageCacheBusy()) { + try { + if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) { + final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS); + if (null == runnable) { + break; + } + + final RequestTask rt = castRunnable(runnable); + rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size())); + } else { + break; + } + } catch (Throwable e) { + } + } + + while (true) { + try { + if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) { + final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek(); + if (null == runnable) { + break; + } + final RequestTask rt = castRunnable(runnable); + if (rt.isStopRun()) { + break; + } + + final long behind = System.currentTimeMillis() - rt.getCreateTimestamp(); + if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) { + if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) { + rt.setStopRun(true); + rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size())); + } + } else { + break; + } + } else { + break; + } + } catch (Throwable e) { + } + } + } + + public static RequestTask castRunnable(final Runnable runnable) { + try { + FutureTaskExt object = (FutureTaskExt) runnable; + return (RequestTask) object.getRunnable(); + } catch (Throwable e) { + log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e); + } + + return null; + } + + public void shutdown() { + this.scheduledExecutorService.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java new file mode 100644 index 0000000..f81d48a --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.broker.latency; + +import java.util.concurrent.*; + +/** + * @author shijia.wxr + */ +public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor { + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + } + + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + @Override + protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) { + return new FutureTaskExt<T>(runnable, value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java new file mode 100644 index 0000000..6ec7bb5 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.broker.latency; + +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +/** + * @author shijia.wxr + */ +public class FutureTaskExt<V> extends FutureTask<V> { + private final Runnable runnable; + + public FutureTaskExt(final Callable<V> callable) { + super(callable); + this.runnable = null; + } + + public FutureTaskExt(final Runnable runnable, final V result) { + super(runnable, result); + this.runnable = runnable; + } + + public Runnable getRunnable() { + return runnable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java new file mode 100644 index 0000000..bc9c58d --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.longpolling; + +import java.util.ArrayList; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class ManyPullRequest { + private final ArrayList<PullRequest> pullRequestList = new ArrayList<PullRequest>(); + + + public synchronized void addPullRequest(final PullRequest pullRequest) { + this.pullRequestList.add(pullRequest); + } + + + public synchronized void addPullRequest(final List<PullRequest> many) { + this.pullRequestList.addAll(many); + } + + + public synchronized List<PullRequest> cloneListAndClear() { + if (!this.pullRequestList.isEmpty()) { + List<PullRequest> result = (ArrayList<PullRequest>) this.pullRequestList.clone(); + this.pullRequestList.clear(); + return result; + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java new file mode 100644 index 0000000..15ee050 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.broker.longpolling; + +import com.alibaba.rocketmq.store.MessageArrivingListener; + + +public class NotifyMessageArrivingListener implements MessageArrivingListener { + private final PullRequestHoldService pullRequestHoldService; + + + public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) { + this.pullRequestHoldService = pullRequestHoldService; + } + + + @Override + public void arriving(String topic, int queueId, long logicOffset, long tagsCode) { + this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java new file mode 100644 index 0000000..b4f1e11 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.longpolling; + +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.Channel; + + +/** + * @author shijia.wxr + */ +public class PullRequest { + private final RemotingCommand requestCommand; + private final Channel clientChannel; + private final long timeoutMillis; + private final long suspendTimestamp; + private final long pullFromThisOffset; + private final SubscriptionData subscriptionData; + + + public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp, + long pullFromThisOffset, SubscriptionData subscriptionData) { + this.requestCommand = requestCommand; + this.clientChannel = clientChannel; + this.timeoutMillis = timeoutMillis; + this.suspendTimestamp = suspendTimestamp; + this.pullFromThisOffset = pullFromThisOffset; + this.subscriptionData = subscriptionData; + } + + + public RemotingCommand getRequestCommand() { + return requestCommand; + } + + + public Channel getClientChannel() { + return clientChannel; + } + + + public long getTimeoutMillis() { + return timeoutMillis; + } + + + public long getSuspendTimestamp() { + return suspendTimestamp; + } + + + public long getPullFromThisOffset() { + return pullFromThisOffset; + } + + public SubscriptionData getSubscriptionData() { + return subscriptionData; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java new file mode 100644 index 0000000..888c5f2 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.longpolling; + +import com.alibaba.rocketmq.broker.BrokerController; +import com.alibaba.rocketmq.common.ServiceThread; +import com.alibaba.rocketmq.common.SystemClock; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.store.DefaultMessageFilter; +import com.alibaba.rocketmq.store.MessageFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class PullRequestHoldService extends ServiceThread { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final String TOPIC_QUEUEID_SEPARATOR = "@"; + private final BrokerController brokerController; + private final SystemClock systemClock = new SystemClock(); + private final MessageFilter messageFilter = new DefaultMessageFilter(); + private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = + new ConcurrentHashMap<String, ManyPullRequest>(1024); + + + public PullRequestHoldService(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) { + String key = this.buildKey(topic, queueId); + ManyPullRequest mpr = this.pullRequestTable.get(key); + if (null == mpr) { + mpr = new ManyPullRequest(); + ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr); + if (prev != null) { + mpr = prev; + } + } + + mpr.addPullRequest(pullRequest); + } + + private String buildKey(final String topic, final int queueId) { + StringBuilder sb = new StringBuilder(); + sb.append(topic); + sb.append(TOPIC_QUEUEID_SEPARATOR); + sb.append(queueId); + return sb.toString(); + } + + @Override + public void run() { + log.info(this.getServiceName() + " service started"); + while (!this.isStopped()) { + try { + if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { + this.waitForRunning(5 * 1000); + } else { + this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); + } + + long beginLockTimestamp = this.systemClock.now(); + this.checkHoldRequest(); + long costTime = this.systemClock.now() - beginLockTimestamp; + if (costTime > 5 * 1000) { + log.info("[NOTIFYME] check hold request cost {} ms.", costTime); + } + } catch (Throwable e) { + log.warn(this.getServiceName() + " service has exception. ", e); + } + } + + log.info(this.getServiceName() + " service end"); + } + + @Override + public String getServiceName() { + return PullRequestHoldService.class.getSimpleName(); + } + + private void checkHoldRequest() { + for (String key : this.pullRequestTable.keySet()) { + String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); + if (kArray != null && 2 == kArray.length) { + String topic = kArray[0]; + int queueId = Integer.parseInt(kArray[1]); + final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); + try { + this.notifyMessageArriving(topic, queueId, offset); + } catch (Throwable e) { + log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e); + } + } + } + } + + public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) { + notifyMessageArriving(topic, queueId, maxOffset, null); + } + + public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) { + String key = this.buildKey(topic, queueId); + ManyPullRequest mpr = this.pullRequestTable.get(key); + if (mpr != null) { + List<PullRequest> requestList = mpr.cloneListAndClear(); + if (requestList != null) { + List<PullRequest> replayList = new ArrayList<PullRequest>(); + + for (PullRequest request : requestList) { + long newestOffset = maxOffset; + if (newestOffset <= request.getPullFromThisOffset()) { + newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); + } + + Long tmp = tagsCode; + if (newestOffset > request.getPullFromThisOffset()) { + if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) { + try { + this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), + request.getRequestCommand()); + } catch (Throwable e) { + log.error("execute request when wakeup failed.", e); + } + continue; + } + } + + if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { + try { + this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), + request.getRequestCommand()); + } catch (Throwable e) { + log.error("execute request when wakeup failed.", e); + } + continue; + } + + + replayList.add(request); + } + + if (!replayList.isEmpty()) { + mpr.addPullRequest(replayList); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java new file mode 100644 index 0000000..b7f9c6e --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.mqtrace; + +import com.alibaba.rocketmq.store.stats.BrokerStatsManager; + +import java.util.Map; + + +public class ConsumeMessageContext { + private String consumerGroup; + private String topic; + private Integer queueId; + private String clientHost; + private String storeHost; + private Map<String, Long> messageIds; + private int bodyLength; + private boolean success; + private String status; + private Object mqTraceContext; + + private String commercialOwner; + private BrokerStatsManager.StatsType commercialRcvStats; + private int commercialRcvTimes; + private int commercialRcvSize; + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public Integer getQueueId() { + return queueId; + } + + + public void setQueueId(Integer queueId) { + this.queueId = queueId; + } + + + public String getClientHost() { + return clientHost; + } + + + public void setClientHost(String clientHost) { + this.clientHost = clientHost; + } + + + public String getStoreHost() { + return storeHost; + } + + + public void setStoreHost(String storeHost) { + this.storeHost = storeHost; + } + + + public Map<String, Long> getMessageIds() { + return messageIds; + } + + + public void setMessageIds(Map<String, Long> messageIds) { + this.messageIds = messageIds; + } + + + public boolean isSuccess() { + return success; + } + + + public void setSuccess(boolean success) { + this.success = success; + } + + + public String getStatus() { + return status; + } + + + public void setStatus(String status) { + this.status = status; + } + + + public Object getMqTraceContext() { + return mqTraceContext; + } + + + public void setMqTraceContext(Object mqTraceContext) { + this.mqTraceContext = mqTraceContext; + } + + + public int getBodyLength() { + return bodyLength; + } + + + public void setBodyLength(int bodyLength) { + this.bodyLength = bodyLength; + } + + public String getCommercialOwner() { + return commercialOwner; + } + + public void setCommercialOwner(final String commercialOwner) { + this.commercialOwner = commercialOwner; + } + + public BrokerStatsManager.StatsType getCommercialRcvStats() { + return commercialRcvStats; + } + + public void setCommercialRcvStats(final BrokerStatsManager.StatsType commercialRcvStats) { + this.commercialRcvStats = commercialRcvStats; + } + + public int getCommercialRcvTimes() { + return commercialRcvTimes; + } + + public void setCommercialRcvTimes(final int commercialRcvTimes) { + this.commercialRcvTimes = commercialRcvTimes; + } + + public int getCommercialRcvSize() { + return commercialRcvSize; + } + + public void setCommercialRcvSize(final int commercialRcvSize) { + this.commercialRcvSize = commercialRcvSize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java new file mode 100644 index 0000000..4a74db3 --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.mqtrace; + +public interface ConsumeMessageHook { + String hookName(); + + + void consumeMessageBefore(final ConsumeMessageContext context); + + + void consumeMessageAfter(final ConsumeMessageContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java new file mode 100644 index 0000000..5bd29cf --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.mqtrace; + +import com.alibaba.rocketmq.common.message.MessageType; +import com.alibaba.rocketmq.store.stats.BrokerStatsManager; + +import java.util.Properties; + + +public class SendMessageContext { + private String producerGroup; + private String topic; + private String msgId; + private String originMsgId; + private Integer queueId; + private Long queueOffset; + private String brokerAddr; + private String bornHost; + private int bodyLength; + private int code; + private String errorMsg; + private String msgProps; + private Object mqTraceContext; + private Properties extProps; + private String brokerRegionId; + private String msgUniqueKey; + private long bornTimeStamp; + private MessageType msgType = MessageType.Trans_msg_Commit; + private boolean isSuccess = false; + //For Commercial + private String commercialOwner; + private BrokerStatsManager.StatsType commercialSendStats; + private int commercialSendSize; + private int commercialSendTimes; + + public boolean isSuccess() { + return isSuccess; + } + + public void setSuccess(final boolean success) { + isSuccess = success; + } + + public MessageType getMsgType() { + return msgType; + } + + public void setMsgType(final MessageType msgType) { + this.msgType = msgType; + } + + public String getMsgUniqueKey() { + return msgUniqueKey; + } + + public void setMsgUniqueKey(final String msgUniqueKey) { + this.msgUniqueKey = msgUniqueKey; + } + + public long getBornTimeStamp() { + return bornTimeStamp; + } + + public void setBornTimeStamp(final long bornTimeStamp) { + this.bornTimeStamp = bornTimeStamp; + } + + public String getBrokerRegionId() { + return brokerRegionId; + } + + public void setBrokerRegionId(final String brokerRegionId) { + this.brokerRegionId = brokerRegionId; + } + + public String getProducerGroup() { + return producerGroup; + } + + + public void setProducerGroup(String producerGroup) { + this.producerGroup = producerGroup; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getMsgId() { + return msgId; + } + + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + + public String getOriginMsgId() { + return originMsgId; + } + + + public void setOriginMsgId(String originMsgId) { + this.originMsgId = originMsgId; + } + + + public Integer getQueueId() { + return queueId; + } + + + public void setQueueId(Integer queueId) { + this.queueId = queueId; + } + + + public Long getQueueOffset() { + return queueOffset; + } + + + public void setQueueOffset(Long queueOffset) { + this.queueOffset = queueOffset; + } + + + public String getBrokerAddr() { + return brokerAddr; + } + + + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } + + + public String getBornHost() { + return bornHost; + } + + + public void setBornHost(String bornHost) { + this.bornHost = bornHost; + } + + + public int getBodyLength() { + return bodyLength; + } + + + public void setBodyLength(int bodyLength) { + this.bodyLength = bodyLength; + } + + + public int getCode() { + return code; + } + + + public void setCode(int code) { + this.code = code; + } + + + public String getErrorMsg() { + return errorMsg; + } + + + public void setErrorMsg(String errorMsg) { + this.errorMsg = errorMsg; + } + + + public String getMsgProps() { + return msgProps; + } + + + public void setMsgProps(String msgProps) { + this.msgProps = msgProps; + } + + + public Object getMqTraceContext() { + return mqTraceContext; + } + + + public void setMqTraceContext(Object mqTraceContext) { + this.mqTraceContext = mqTraceContext; + } + + + public Properties getExtProps() { + return extProps; + } + + + public void setExtProps(Properties extProps) { + this.extProps = extProps; + } + + public String getCommercialOwner() { + return commercialOwner; + } + + public void setCommercialOwner(final String commercialOwner) { + this.commercialOwner = commercialOwner; + } + + public BrokerStatsManager.StatsType getCommercialSendStats() { + return commercialSendStats; + } + + public void setCommercialSendStats(final BrokerStatsManager.StatsType commercialSendStats) { + this.commercialSendStats = commercialSendStats; + } + + public int getCommercialSendSize() { + return commercialSendSize; + } + + public void setCommercialSendSize(final int commercialSendSize) { + this.commercialSendSize = commercialSendSize; + } + + public int getCommercialSendTimes() { + return commercialSendTimes; + } + + public void setCommercialSendTimes(final int commercialSendTimes) { + this.commercialSendTimes = commercialSendTimes; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java ---------------------------------------------------------------------- diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java new file mode 100644 index 0000000..e079b9f --- /dev/null +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.broker.mqtrace; + +public interface SendMessageHook { + public String hookName(); + + + public void sendMessageBefore(final SendMessageContext context); + + + public void sendMessageAfter(final SendMessageContext context); +}
