http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java deleted file mode 100644 index 74e7ea7..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.client; - -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.common.RemotingUtil; -import io.netty.channel.Channel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - - -/** - * @author shijia.wxr - */ -public class ProducerManager { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private static final long LOCK_TIMEOUT_MILLIS = 3000; - private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; - private final Lock groupChannelLock = new ReentrantLock(); - private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable = - new HashMap<String, HashMap<Channel, ClientChannelInfo>>(); - - - public ProducerManager() { - } - - - public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() { - HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable = - new HashMap<String, HashMap<Channel, ClientChannelInfo>>(); - try { - if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - newGroupChannelTable.putAll(groupChannelTable); - } finally { - groupChannelLock.unlock(); - } - } - } catch (InterruptedException e) { - log.error("", e); - } - return newGroupChannelTable; - } - - - public void scanNotActiveChannel() { - try { - if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable - .entrySet()) { - final String group = entry.getKey(); - final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue(); - - Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator(); - while (it.hasNext()) { - Entry<Channel, ClientChannelInfo> item = it.next(); - // final Integer id = item.getKey(); - final ClientChannelInfo info = item.getValue(); - - long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp(); - if (diff > CHANNEL_EXPIRED_TIMEOUT) { - it.remove(); - log.warn( - "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", - RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group); - RemotingUtil.closeChannel(info.getChannel()); - } - } - } - } finally { - this.groupChannelLock.unlock(); - } - } else { - log.warn("ProducerManager scanNotActiveChannel lock timeout"); - } - } catch (InterruptedException e) { - log.error("", e); - } - } - - - public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { - if (channel != null) { - try { - if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable - .entrySet()) { - final String group = entry.getKey(); - final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable = - entry.getValue(); - final ClientChannelInfo clientChannelInfo = - clientChannelInfoTable.remove(channel); - if (clientChannelInfo != null) { - log.info( - "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}", - clientChannelInfo.toString(), remoteAddr, group); - } - - } - } finally { - this.groupChannelLock.unlock(); - } - } else { - log.warn("ProducerManager doChannelCloseEvent lock timeout"); - } - } catch (InterruptedException e) { - log.error("", e); - } - } - } - - - public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) { - try { - ClientChannelInfo clientChannelInfoFound = null; - - if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); - if (null == channelTable) { - channelTable = new HashMap<Channel, ClientChannelInfo>(); - this.groupChannelTable.put(group, channelTable); - } - - clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel()); - if (null == clientChannelInfoFound) { - channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo); - log.info("new producer connected, group: {} channel: {}", group, - clientChannelInfo.toString()); - } - } finally { - this.groupChannelLock.unlock(); - } - - if (clientChannelInfoFound != null) { - clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis()); - } - } else { - log.warn("ProducerManager registerProducer lock timeout"); - } - } catch (InterruptedException e) { - log.error("", e); - } - } - - - public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) { - try { - if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); - if (null != channelTable && !channelTable.isEmpty()) { - ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel()); - if (old != null) { - log.info("unregister a producer[{}] from groupChannelTable {}", group, - clientChannelInfo.toString()); - } - - if (channelTable.isEmpty()) { - this.groupChannelTable.remove(group); - log.info("unregister a producer group[{}] from groupChannelTable", group); - } - } - } finally { - this.groupChannelLock.unlock(); - } - } else { - log.warn("ProducerManager unregisterProducer lock timeout"); - } - } catch (InterruptedException e) { - log.error("", e); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java deleted file mode 100644 index a38c9cb..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java +++ /dev/null @@ -1,317 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.client.net; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.broker.client.ClientChannelInfo; -import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo; -import com.alibaba.rocketmq.broker.pagecache.OneMessageTransfer; -import com.alibaba.rocketmq.common.MQVersion; -import com.alibaba.rocketmq.common.TopicConfig; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.message.MessageQueueForC; -import com.alibaba.rocketmq.common.protocol.RequestCode; -import com.alibaba.rocketmq.common.protocol.ResponseCode; -import com.alibaba.rocketmq.common.protocol.body.GetConsumerStatusBody; -import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBody; -import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBodyForC; -import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader; -import com.alibaba.rocketmq.common.protocol.header.ResetOffsetRequestHeader; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; -import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import com.alibaba.rocketmq.store.SelectMappedBufferResult; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.FileRegion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * @author shijia.wxr - */ -public class Broker2Client { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final BrokerController brokerController; - - public Broker2Client(BrokerController brokerController) { - this.brokerController = brokerController; - } - - public void checkProducerTransactionState( - final Channel channel, - final CheckTransactionStateRequestHeader requestHeader, - final SelectMappedBufferResult selectMappedBufferResult) { - RemotingCommand request = - RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader); - request.markOnewayRPC(); - - try { - FileRegion fileRegion = - new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()), - selectMappedBufferResult); - channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - selectMappedBufferResult.release(); - if (!future.isSuccess()) { - log.error("invokeProducer failed,", future.cause()); - } - } - }); - } catch (Throwable e) { - log.error("invokeProducer exception", e); - selectMappedBufferResult.release(); - } - } - - public RemotingCommand callClient(final Channel channel, - final RemotingCommand request - ) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException { - return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000); - } - - public void notifyConsumerIdsChanged( - final Channel channel, - final String consumerGroup) { - if (null == consumerGroup) { - log.error("notifyConsumerIdsChanged consumerGroup is null"); - return; - } - - NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader(); - requestHeader.setConsumerGroup(consumerGroup); - RemotingCommand request = - RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); - - try { - this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); - } catch (Exception e) { - log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage()); - } - } - - public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce) { - return resetOffset(topic, group, timeStamp, isForce, false); - } - - public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce, - boolean isC) { - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); - if (null == topicConfig) { - log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic); - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic); - return response; - } - - Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(); - - for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { - MessageQueue mq = new MessageQueue(); - mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); - mq.setTopic(topic); - mq.setQueueId(i); - - long consumerOffset = - this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i); - if (-1 == consumerOffset) { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("THe consumer group <%s> not exist", group)); - return response; - } - - long timeStampOffset; - if (timeStamp == -1) { - - timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i); - } else { - timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp); - } - - if (timeStampOffset < 0) { - log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset); - timeStampOffset = 0; - } - - if (isForce || timeStampOffset < consumerOffset) { - offsetTable.put(mq, timeStampOffset); - } else { - offsetTable.put(mq, consumerOffset); - } - } - - ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setGroup(group); - requestHeader.setTimestamp(timeStamp); - RemotingCommand request = - RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader); - if (isC) { - // c++ language - ResetOffsetBodyForC body = new ResetOffsetBodyForC(); - List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable); - body.setOffsetTable(offsetList); - request.setBody(body.encode()); - } else { - // other language - ResetOffsetBody body = new ResetOffsetBody(); - body.setOffsetTable(offsetTable); - request.setBody(body.encode()); - } - - ConsumerGroupInfo consumerGroupInfo = - this.brokerController.getConsumerManager().getConsumerGroupInfo(group); - - if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) { - ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = - consumerGroupInfo.getChannelInfoTable(); - for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) { - int version = entry.getValue().getVersion(); - if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) { - try { - this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000); - log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}", - new Object[]{topic, group, entry.getValue().getClientId()}); - } catch (Exception e) { - log.error("[reset-offset] reset offset exception. topic={}, group={}", - new Object[]{topic, group}, e); - } - } else { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark("the client does not support this feature. version=" - + MQVersion.getVersionDesc(version)); - log.warn("[reset-offset] the client does not support this feature. version={}", - RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); - return response; - } - } - } else { - String errorInfo = - String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d", - requestHeader.getGroup(), - requestHeader.getTopic(), - requestHeader.getTimestamp()); - log.error(errorInfo); - response.setCode(ResponseCode.CONSUMER_NOT_ONLINE); - response.setRemark(errorInfo); - return response; - } - response.setCode(ResponseCode.SUCCESS); - ResetOffsetBody resBody = new ResetOffsetBody(); - resBody.setOffsetTable(offsetTable); - response.setBody(resBody.encode()); - return response; - } - - private List<MessageQueueForC> convertOffsetTable2OffsetList(Map<MessageQueue, Long> table) { - List<MessageQueueForC> list = new ArrayList<MessageQueueForC>(); - for (Entry<MessageQueue, Long> entry : table.entrySet()) { - MessageQueue mq = entry.getKey(); - MessageQueueForC tmp = - new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue()); - list.add(tmp); - } - return list; - } - - public RemotingCommand getConsumeStatus(String topic, String group, String originClientId) { - final RemotingCommand result = RemotingCommand.createResponseCommand(null); - - GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader(); - requestHeader.setTopic(topic); - requestHeader.setGroup(group); - RemotingCommand request = - RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, - requestHeader); - - Map<String, Map<MessageQueue, Long>> consumerStatusTable = - new HashMap<String, Map<MessageQueue, Long>>(); - ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = - this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable(); - if (null == channelInfoTable || channelInfoTable.isEmpty()) { - result.setCode(ResponseCode.SYSTEM_ERROR); - result.setRemark(String.format("No Any Consumer online in the consumer group: [%s]", group)); - return result; - } - - for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) { - int version = entry.getValue().getVersion(); - String clientId = entry.getValue().getClientId(); - if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) { - result.setCode(ResponseCode.SYSTEM_ERROR); - result.setRemark("the client does not support this feature. version=" - + MQVersion.getVersionDesc(version)); - log.warn("[get-consumer-status] the client does not support this feature. version={}", - RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); - return result; - } else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) { - try { - RemotingCommand response = - this.brokerController.getRemotingServer().invokeSync(entry.getKey(), request, 5000); - assert response != null; - switch (response.getCode()) { - case ResponseCode.SUCCESS: { - if (response.getBody() != null) { - GetConsumerStatusBody body = - GetConsumerStatusBody.decode(response.getBody(), - GetConsumerStatusBody.class); - - consumerStatusTable.put(clientId, body.getMessageQueueTable()); - log.info( - "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}", - new Object[]{topic, group, clientId}); - } - } - default: - break; - } - } catch (Exception e) { - log.error( - "[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}", - new Object[]{topic, group}, e); - } - - if (!UtilAll.isBlank(originClientId) && originClientId.equals(clientId)) { - break; - } - } - } - - result.setCode(ResponseCode.SUCCESS); - GetConsumerStatusBody resBody = new GetConsumerStatusBody(); - resBody.setConsumerTable(consumerStatusTable); - result.setBody(resBody.encode()); - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java deleted file mode 100644 index 84be628..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java +++ /dev/null @@ -1,281 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.client.rebalance; - -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.common.message.MessageQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - - -/** - * @author shijia.wxr - */ -public class RebalanceLockManager { - private static final Logger log = LoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME); - private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty( - "rocketmq.broker.rebalance.lockMaxLiveTime", "60000")); - private final Lock lock = new ReentrantLock(); - private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable = - new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024); - - public boolean tryLock(final String group, final MessageQueue mq, final String clientId) { - - if (!this.isLocked(group, mq, clientId)) { - try { - this.lock.lockInterruptibly(); - try { - ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); - if (null == groupValue) { - groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32); - this.mqLockTable.put(group, groupValue); - } - - LockEntry lockEntry = groupValue.get(mq); - if (null == lockEntry) { - lockEntry = new LockEntry(); - lockEntry.setClientId(clientId); - groupValue.put(mq, lockEntry); - log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", // - group, // - clientId, // - mq); - } - - if (lockEntry.isLocked(clientId)) { - lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); - return true; - } - - String oldClientId = lockEntry.getClientId(); - - - if (lockEntry.isExpired()) { - lockEntry.setClientId(clientId); - lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); - log.warn( - "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // - mq); - return true; - } - - - log.warn( - "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // - mq); - return false; - } finally { - this.lock.unlock(); - } - } catch (InterruptedException e) { - log.error("putMessage exception", e); - } - } else { - - } - - return true; - } - - private boolean isLocked(final String group, final MessageQueue mq, final String clientId) { - ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); - if (groupValue != null) { - LockEntry lockEntry = groupValue.get(mq); - if (lockEntry != null) { - boolean locked = lockEntry.isLocked(clientId); - if (locked) { - lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); - } - - return locked; - } - } - - return false; - } - - public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs, - final String clientId) { - Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size()); - Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size()); - - - for (MessageQueue mq : mqs) { - if (this.isLocked(group, mq, clientId)) { - lockedMqs.add(mq); - } else { - notLockedMqs.add(mq); - } - } - - if (!notLockedMqs.isEmpty()) { - try { - this.lock.lockInterruptibly(); - try { - ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); - if (null == groupValue) { - groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32); - this.mqLockTable.put(group, groupValue); - } - - - for (MessageQueue mq : notLockedMqs) { - LockEntry lockEntry = groupValue.get(mq); - if (null == lockEntry) { - lockEntry = new LockEntry(); - lockEntry.setClientId(clientId); - groupValue.put(mq, lockEntry); - log.info( - "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", // - group, // - clientId, // - mq); - } - - - if (lockEntry.isLocked(clientId)) { - lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); - lockedMqs.add(mq); - continue; - } - - String oldClientId = lockEntry.getClientId(); - - - if (lockEntry.isExpired()) { - lockEntry.setClientId(clientId); - lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); - log.warn( - "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // - mq); - lockedMqs.add(mq); - continue; - } - - - log.warn( - "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // - mq); - } - } finally { - this.lock.unlock(); - } - } catch (InterruptedException e) { - log.error("putMessage exception", e); - } - } - - return lockedMqs; - } - - public void unlockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) { - try { - this.lock.lockInterruptibly(); - try { - ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); - if (null != groupValue) { - for (MessageQueue mq : mqs) { - LockEntry lockEntry = groupValue.get(mq); - if (null != lockEntry) { - if (lockEntry.getClientId().equals(clientId)) { - groupValue.remove(mq); - log.info("unlockBatch, Group: {} {} {}", - group, - mq, - clientId); - } else { - log.warn("unlockBatch, but mq locked by other client: {}, Group: {} {} {}", - lockEntry.getClientId(), - group, - mq, - clientId); - } - } else { - log.warn("unlockBatch, but mq not locked, Group: {} {} {}", - group, - mq, - clientId); - } - } - } else { - log.warn("unlockBatch, group not exist, Group: {} {}", - group, - clientId); - } - } finally { - this.lock.unlock(); - } - } catch (InterruptedException e) { - log.error("putMessage exception", e); - } - } - - static class LockEntry { - private String clientId; - private volatile long lastUpdateTimestamp = System.currentTimeMillis(); - - - public String getClientId() { - return clientId; - } - - - public void setClientId(String clientId) { - this.clientId = clientId; - } - - - public long getLastUpdateTimestamp() { - return lastUpdateTimestamp; - } - - - public void setLastUpdateTimestamp(long lastUpdateTimestamp) { - this.lastUpdateTimestamp = lastUpdateTimestamp; - } - - public boolean isLocked(final String clientId) { - boolean eq = this.clientId.equals(clientId); - return eq && !this.isExpired(); - } - - public boolean isExpired() { - boolean expired = - (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; - - return expired; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java deleted file mode 100644 index b2e7e82..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.broker.filtersrv; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.broker.BrokerStartup; -import com.alibaba.rocketmq.common.ThreadFactoryImpl; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.remoting.common.RemotingUtil; -import io.netty.channel.Channel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - - -public class FilterServerManager { - - public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000; - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final ConcurrentHashMap<Channel, FilterServerInfo> filterServerTable = - new ConcurrentHashMap<Channel, FilterServerInfo>(16); - private final BrokerController brokerController; - - private ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread")); - - public FilterServerManager(final BrokerController brokerController) { - this.brokerController = brokerController; - } - - public void start() { - - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - FilterServerManager.this.createFilterServer(); - } catch (Exception e) { - log.error("", e); - } - } - }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS); - } - - public void createFilterServer() { - int more = - this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size(); - String cmd = this.buildStartCommand(); - for (int i = 0; i < more; i++) { - FilterServerUtil.callShell(cmd, log); - } - } - - private String buildStartCommand() { - String config = ""; - if (BrokerStartup.configFile != null) { - config = String.format("-c %s", BrokerStartup.configFile); - } - - if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) { - config += String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr()); - } - - if (RemotingUtil.isWindowsPlatform()) { - return String.format("start /b %s\\bin\\mqfiltersrv.exe %s", - this.brokerController.getBrokerConfig().getRocketmqHome(), - config); - } else { - return String.format("sh %s/bin/startfsrv.sh %s", - this.brokerController.getBrokerConfig().getRocketmqHome(), - config); - } - } - - public void shutdown() { - this.scheduledExecutorService.shutdown(); - } - - public void registerFilterServer(final Channel channel, final String filterServerAddr) { - FilterServerInfo filterServerInfo = this.filterServerTable.get(channel); - if (filterServerInfo != null) { - filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis()); - } else { - filterServerInfo = new FilterServerInfo(); - filterServerInfo.setFilterServerAddr(filterServerAddr); - filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis()); - this.filterServerTable.put(channel, filterServerInfo); - log.info("Receive a New Filter Server<{}>", filterServerAddr); - } - } - - /** - - */ - public void scanNotActiveChannel() { - - Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<Channel, FilterServerInfo> next = it.next(); - long timestamp = next.getValue().getLastUpdateTimestamp(); - Channel channel = next.getKey(); - if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) { - log.info("The Filter Server<{}> expired, remove it", next.getKey()); - it.remove(); - RemotingUtil.closeChannel(channel); - } - } - } - - public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { - FilterServerInfo old = this.filterServerTable.remove(channel); - if (old != null) { - log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(), - remoteAddr); - } - } - - public List<String> buildNewFilterServerList() { - List<String> addr = new ArrayList<String>(); - Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<Channel, FilterServerInfo> next = it.next(); - addr.add(next.getValue().getFilterServerAddr()); - } - return addr; - } - - static class FilterServerInfo { - private String filterServerAddr; - private long lastUpdateTimestamp; - - - public String getFilterServerAddr() { - return filterServerAddr; - } - - - public void setFilterServerAddr(String filterServerAddr) { - this.filterServerAddr = filterServerAddr; - } - - - public long getLastUpdateTimestamp() { - return lastUpdateTimestamp; - } - - - public void setLastUpdateTimestamp(long lastUpdateTimestamp) { - this.lastUpdateTimestamp = lastUpdateTimestamp; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java deleted file mode 100644 index c5ace19..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.broker.filtersrv; - -import org.slf4j.Logger; - - -public class FilterServerUtil { - public static void callShell(final String shellString, final Logger log) { - Process process = null; - try { - String[] cmdArray = splitShellString(shellString); - process = Runtime.getRuntime().exec(cmdArray); - process.waitFor(); - log.info("callShell: <{}> OK", shellString); - } catch (Throwable e) { - log.error("callShell: readLine IOException, " + shellString, e); - } finally { - if (null != process) - process.destroy(); - } - } - - private static String[] splitShellString(final String shellString) { - String[] split = shellString.split(" "); - return split; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java deleted file mode 100644 index 586bed0..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.latency; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.common.ThreadFactoryImpl; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.remoting.netty.RequestTask; -import com.alibaba.rocketmq.remoting.protocol.RemotingSysResponseCode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - - -/** - * @author shijia.wxr - */ -public class BrokerFastFailure { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "BrokerFastFailureScheduledThread")); - private final BrokerController brokerController; - - public BrokerFastFailure(final BrokerController brokerController) { - this.brokerController = brokerController; - } - - public void start() { - this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - cleanExpiredRequest(); - } - }, 1000, 10, TimeUnit.MILLISECONDS); - } - - private void cleanExpiredRequest() { - while (this.brokerController.getMessageStore().isOSPageCacheBusy()) { - try { - if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) { - final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS); - if (null == runnable) { - break; - } - - final RequestTask rt = castRunnable(runnable); - rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size())); - } else { - break; - } - } catch (Throwable e) { - } - } - - while (true) { - try { - if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) { - final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek(); - if (null == runnable) { - break; - } - final RequestTask rt = castRunnable(runnable); - if (rt.isStopRun()) { - break; - } - - final long behind = System.currentTimeMillis() - rt.getCreateTimestamp(); - if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) { - if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) { - rt.setStopRun(true); - rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size())); - } - } else { - break; - } - } else { - break; - } - } catch (Throwable e) { - } - } - } - - public static RequestTask castRunnable(final Runnable runnable) { - try { - FutureTaskExt object = (FutureTaskExt) runnable; - return (RequestTask) object.getRunnable(); - } catch (Throwable e) { - log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e); - } - - return null; - } - - public void shutdown() { - this.scheduledExecutorService.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java deleted file mode 100644 index f81d48a..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.broker.latency; - -import java.util.concurrent.*; - -/** - * @author shijia.wxr - */ -public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor { - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); - } - - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); - } - - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); - } - - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); - } - - @Override - protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) { - return new FutureTaskExt<T>(runnable, value); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java b/broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java deleted file mode 100644 index 6ec7bb5..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.broker.latency; - -import java.util.concurrent.Callable; -import java.util.concurrent.FutureTask; - -/** - * @author shijia.wxr - */ -public class FutureTaskExt<V> extends FutureTask<V> { - private final Runnable runnable; - - public FutureTaskExt(final Callable<V> callable) { - super(callable); - this.runnable = null; - } - - public FutureTaskExt(final Runnable runnable, final V result) { - super(runnable, result); - this.runnable = runnable; - } - - public Runnable getRunnable() { - return runnable; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java b/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java deleted file mode 100644 index bc9c58d..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.longpolling; - -import java.util.ArrayList; -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class ManyPullRequest { - private final ArrayList<PullRequest> pullRequestList = new ArrayList<PullRequest>(); - - - public synchronized void addPullRequest(final PullRequest pullRequest) { - this.pullRequestList.add(pullRequest); - } - - - public synchronized void addPullRequest(final List<PullRequest> many) { - this.pullRequestList.addAll(many); - } - - - public synchronized List<PullRequest> cloneListAndClear() { - if (!this.pullRequestList.isEmpty()) { - List<PullRequest> result = (ArrayList<PullRequest>) this.pullRequestList.clone(); - this.pullRequestList.clear(); - return result; - } - - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java deleted file mode 100644 index 15ee050..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.rocketmq.broker.longpolling; - -import com.alibaba.rocketmq.store.MessageArrivingListener; - - -public class NotifyMessageArrivingListener implements MessageArrivingListener { - private final PullRequestHoldService pullRequestHoldService; - - - public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) { - this.pullRequestHoldService = pullRequestHoldService; - } - - - @Override - public void arriving(String topic, int queueId, long logicOffset, long tagsCode) { - this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java b/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java deleted file mode 100644 index b4f1e11..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.longpolling; - -import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; -import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.Channel; - - -/** - * @author shijia.wxr - */ -public class PullRequest { - private final RemotingCommand requestCommand; - private final Channel clientChannel; - private final long timeoutMillis; - private final long suspendTimestamp; - private final long pullFromThisOffset; - private final SubscriptionData subscriptionData; - - - public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp, - long pullFromThisOffset, SubscriptionData subscriptionData) { - this.requestCommand = requestCommand; - this.clientChannel = clientChannel; - this.timeoutMillis = timeoutMillis; - this.suspendTimestamp = suspendTimestamp; - this.pullFromThisOffset = pullFromThisOffset; - this.subscriptionData = subscriptionData; - } - - - public RemotingCommand getRequestCommand() { - return requestCommand; - } - - - public Channel getClientChannel() { - return clientChannel; - } - - - public long getTimeoutMillis() { - return timeoutMillis; - } - - - public long getSuspendTimestamp() { - return suspendTimestamp; - } - - - public long getPullFromThisOffset() { - return pullFromThisOffset; - } - - public SubscriptionData getSubscriptionData() { - return subscriptionData; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java deleted file mode 100644 index 888c5f2..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.longpolling; - -import com.alibaba.rocketmq.broker.BrokerController; -import com.alibaba.rocketmq.common.ServiceThread; -import com.alibaba.rocketmq.common.SystemClock; -import com.alibaba.rocketmq.common.constant.LoggerName; -import com.alibaba.rocketmq.store.DefaultMessageFilter; -import com.alibaba.rocketmq.store.MessageFilter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * @author shijia.wxr - */ -public class PullRequestHoldService extends ServiceThread { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private static final String TOPIC_QUEUEID_SEPARATOR = "@"; - private final BrokerController brokerController; - private final SystemClock systemClock = new SystemClock(); - private final MessageFilter messageFilter = new DefaultMessageFilter(); - private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = - new ConcurrentHashMap<String, ManyPullRequest>(1024); - - - public PullRequestHoldService(final BrokerController brokerController) { - this.brokerController = brokerController; - } - - public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) { - String key = this.buildKey(topic, queueId); - ManyPullRequest mpr = this.pullRequestTable.get(key); - if (null == mpr) { - mpr = new ManyPullRequest(); - ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr); - if (prev != null) { - mpr = prev; - } - } - - mpr.addPullRequest(pullRequest); - } - - private String buildKey(final String topic, final int queueId) { - StringBuilder sb = new StringBuilder(); - sb.append(topic); - sb.append(TOPIC_QUEUEID_SEPARATOR); - sb.append(queueId); - return sb.toString(); - } - - @Override - public void run() { - log.info(this.getServiceName() + " service started"); - while (!this.isStopped()) { - try { - if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { - this.waitForRunning(5 * 1000); - } else { - this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); - } - - long beginLockTimestamp = this.systemClock.now(); - this.checkHoldRequest(); - long costTime = this.systemClock.now() - beginLockTimestamp; - if (costTime > 5 * 1000) { - log.info("[NOTIFYME] check hold request cost {} ms.", costTime); - } - } catch (Throwable e) { - log.warn(this.getServiceName() + " service has exception. ", e); - } - } - - log.info(this.getServiceName() + " service end"); - } - - @Override - public String getServiceName() { - return PullRequestHoldService.class.getSimpleName(); - } - - private void checkHoldRequest() { - for (String key : this.pullRequestTable.keySet()) { - String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); - if (kArray != null && 2 == kArray.length) { - String topic = kArray[0]; - int queueId = Integer.parseInt(kArray[1]); - final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); - try { - this.notifyMessageArriving(topic, queueId, offset); - } catch (Throwable e) { - log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e); - } - } - } - } - - public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) { - notifyMessageArriving(topic, queueId, maxOffset, null); - } - - public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) { - String key = this.buildKey(topic, queueId); - ManyPullRequest mpr = this.pullRequestTable.get(key); - if (mpr != null) { - List<PullRequest> requestList = mpr.cloneListAndClear(); - if (requestList != null) { - List<PullRequest> replayList = new ArrayList<PullRequest>(); - - for (PullRequest request : requestList) { - long newestOffset = maxOffset; - if (newestOffset <= request.getPullFromThisOffset()) { - newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); - } - - Long tmp = tagsCode; - if (newestOffset > request.getPullFromThisOffset()) { - if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) { - try { - this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), - request.getRequestCommand()); - } catch (Throwable e) { - log.error("execute request when wakeup failed.", e); - } - continue; - } - } - - if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { - try { - this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), - request.getRequestCommand()); - } catch (Throwable e) { - log.error("execute request when wakeup failed.", e); - } - continue; - } - - - replayList.add(request); - } - - if (!replayList.isEmpty()) { - mpr.addPullRequest(replayList); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java b/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java deleted file mode 100644 index b7f9c6e..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.mqtrace; - -import com.alibaba.rocketmq.store.stats.BrokerStatsManager; - -import java.util.Map; - - -public class ConsumeMessageContext { - private String consumerGroup; - private String topic; - private Integer queueId; - private String clientHost; - private String storeHost; - private Map<String, Long> messageIds; - private int bodyLength; - private boolean success; - private String status; - private Object mqTraceContext; - - private String commercialOwner; - private BrokerStatsManager.StatsType commercialRcvStats; - private int commercialRcvTimes; - private int commercialRcvSize; - - - public String getConsumerGroup() { - return consumerGroup; - } - - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } - - - public String getTopic() { - return topic; - } - - - public void setTopic(String topic) { - this.topic = topic; - } - - - public Integer getQueueId() { - return queueId; - } - - - public void setQueueId(Integer queueId) { - this.queueId = queueId; - } - - - public String getClientHost() { - return clientHost; - } - - - public void setClientHost(String clientHost) { - this.clientHost = clientHost; - } - - - public String getStoreHost() { - return storeHost; - } - - - public void setStoreHost(String storeHost) { - this.storeHost = storeHost; - } - - - public Map<String, Long> getMessageIds() { - return messageIds; - } - - - public void setMessageIds(Map<String, Long> messageIds) { - this.messageIds = messageIds; - } - - - public boolean isSuccess() { - return success; - } - - - public void setSuccess(boolean success) { - this.success = success; - } - - - public String getStatus() { - return status; - } - - - public void setStatus(String status) { - this.status = status; - } - - - public Object getMqTraceContext() { - return mqTraceContext; - } - - - public void setMqTraceContext(Object mqTraceContext) { - this.mqTraceContext = mqTraceContext; - } - - - public int getBodyLength() { - return bodyLength; - } - - - public void setBodyLength(int bodyLength) { - this.bodyLength = bodyLength; - } - - public String getCommercialOwner() { - return commercialOwner; - } - - public void setCommercialOwner(final String commercialOwner) { - this.commercialOwner = commercialOwner; - } - - public BrokerStatsManager.StatsType getCommercialRcvStats() { - return commercialRcvStats; - } - - public void setCommercialRcvStats(final BrokerStatsManager.StatsType commercialRcvStats) { - this.commercialRcvStats = commercialRcvStats; - } - - public int getCommercialRcvTimes() { - return commercialRcvTimes; - } - - public void setCommercialRcvTimes(final int commercialRcvTimes) { - this.commercialRcvTimes = commercialRcvTimes; - } - - public int getCommercialRcvSize() { - return commercialRcvSize; - } - - public void setCommercialRcvSize(final int commercialRcvSize) { - this.commercialRcvSize = commercialRcvSize; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java b/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java deleted file mode 100644 index 4a74db3..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.mqtrace; - -public interface ConsumeMessageHook { - String hookName(); - - - void consumeMessageBefore(final ConsumeMessageContext context); - - - void consumeMessageAfter(final ConsumeMessageContext context); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java b/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java deleted file mode 100644 index 5bd29cf..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.mqtrace; - -import com.alibaba.rocketmq.common.message.MessageType; -import com.alibaba.rocketmq.store.stats.BrokerStatsManager; - -import java.util.Properties; - - -public class SendMessageContext { - private String producerGroup; - private String topic; - private String msgId; - private String originMsgId; - private Integer queueId; - private Long queueOffset; - private String brokerAddr; - private String bornHost; - private int bodyLength; - private int code; - private String errorMsg; - private String msgProps; - private Object mqTraceContext; - private Properties extProps; - private String brokerRegionId; - private String msgUniqueKey; - private long bornTimeStamp; - private MessageType msgType = MessageType.Trans_msg_Commit; - private boolean isSuccess = false; - //For Commercial - private String commercialOwner; - private BrokerStatsManager.StatsType commercialSendStats; - private int commercialSendSize; - private int commercialSendTimes; - - public boolean isSuccess() { - return isSuccess; - } - - public void setSuccess(final boolean success) { - isSuccess = success; - } - - public MessageType getMsgType() { - return msgType; - } - - public void setMsgType(final MessageType msgType) { - this.msgType = msgType; - } - - public String getMsgUniqueKey() { - return msgUniqueKey; - } - - public void setMsgUniqueKey(final String msgUniqueKey) { - this.msgUniqueKey = msgUniqueKey; - } - - public long getBornTimeStamp() { - return bornTimeStamp; - } - - public void setBornTimeStamp(final long bornTimeStamp) { - this.bornTimeStamp = bornTimeStamp; - } - - public String getBrokerRegionId() { - return brokerRegionId; - } - - public void setBrokerRegionId(final String brokerRegionId) { - this.brokerRegionId = brokerRegionId; - } - - public String getProducerGroup() { - return producerGroup; - } - - - public void setProducerGroup(String producerGroup) { - this.producerGroup = producerGroup; - } - - - public String getTopic() { - return topic; - } - - - public void setTopic(String topic) { - this.topic = topic; - } - - - public String getMsgId() { - return msgId; - } - - - public void setMsgId(String msgId) { - this.msgId = msgId; - } - - - public String getOriginMsgId() { - return originMsgId; - } - - - public void setOriginMsgId(String originMsgId) { - this.originMsgId = originMsgId; - } - - - public Integer getQueueId() { - return queueId; - } - - - public void setQueueId(Integer queueId) { - this.queueId = queueId; - } - - - public Long getQueueOffset() { - return queueOffset; - } - - - public void setQueueOffset(Long queueOffset) { - this.queueOffset = queueOffset; - } - - - public String getBrokerAddr() { - return brokerAddr; - } - - - public void setBrokerAddr(String brokerAddr) { - this.brokerAddr = brokerAddr; - } - - - public String getBornHost() { - return bornHost; - } - - - public void setBornHost(String bornHost) { - this.bornHost = bornHost; - } - - - public int getBodyLength() { - return bodyLength; - } - - - public void setBodyLength(int bodyLength) { - this.bodyLength = bodyLength; - } - - - public int getCode() { - return code; - } - - - public void setCode(int code) { - this.code = code; - } - - - public String getErrorMsg() { - return errorMsg; - } - - - public void setErrorMsg(String errorMsg) { - this.errorMsg = errorMsg; - } - - - public String getMsgProps() { - return msgProps; - } - - - public void setMsgProps(String msgProps) { - this.msgProps = msgProps; - } - - - public Object getMqTraceContext() { - return mqTraceContext; - } - - - public void setMqTraceContext(Object mqTraceContext) { - this.mqTraceContext = mqTraceContext; - } - - - public Properties getExtProps() { - return extProps; - } - - - public void setExtProps(Properties extProps) { - this.extProps = extProps; - } - - public String getCommercialOwner() { - return commercialOwner; - } - - public void setCommercialOwner(final String commercialOwner) { - this.commercialOwner = commercialOwner; - } - - public BrokerStatsManager.StatsType getCommercialSendStats() { - return commercialSendStats; - } - - public void setCommercialSendStats(final BrokerStatsManager.StatsType commercialSendStats) { - this.commercialSendStats = commercialSendStats; - } - - public int getCommercialSendSize() { - return commercialSendSize; - } - - public void setCommercialSendSize(final int commercialSendSize) { - this.commercialSendSize = commercialSendSize; - } - - public int getCommercialSendTimes() { - return commercialSendTimes; - } - - public void setCommercialSendTimes(final int commercialSendTimes) { - this.commercialSendTimes = commercialSendTimes; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java b/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java deleted file mode 100644 index e079b9f..0000000 --- a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.broker.mqtrace; - -public interface SendMessageHook { - public String hookName(); - - - public void sendMessageBefore(final SendMessageContext context); - - - public void sendMessageAfter(final SendMessageContext context); -}
