1282009188 commented on a change in pull request #358: [ISSUE #292] Add support of transactional message feature URL: https://github.com/apache/rocketmq/pull/358#discussion_r205724085
########## File path: broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java ########## @@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.transaction.queue; + +import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener; +import org.apache.rocketmq.broker.transaction.OperationResult; +import org.apache.rocketmq.broker.transaction.TransactionalMessageService; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class TransactionalMessageServiceImpl implements TransactionalMessageService { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); + + private TransactionalMessageBridge transactionalMessageBridge; + + private static final int PULL_MSG_RETRY_NUMBER = 1; + + private static final int MAX_PROCESS_TIME_LIMIT = 60000; + + private static final int MAX_RETRY_COUNT_WHEN_HALF_NULL = 1; + + public TransactionalMessageServiceImpl(TransactionalMessageBridge transactionBridge) { + this.transactionalMessageBridge = transactionBridge; + } + + private ConcurrentHashMap<MessageQueue, MessageQueue> opQueueMap = new ConcurrentHashMap<>(); + + @Override + public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) { + return transactionalMessageBridge.putHalfMessage(messageInner); + } + + private boolean needDiscard(MessageExt msgExt, int transactionCheckMax) { + String checkTimes = msgExt.getProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES); + int checkTime = 1; + if (null != checkTimes) { + checkTime = getInt(checkTimes); + if (checkTime >= transactionCheckMax) { + return true; + } else { + checkTime++; + } + } + msgExt.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(checkTime)); + return false; + } + + private boolean needSkip(MessageExt msgExt) { + long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); + if (valueOfCurrentMinusBorn + > transactionalMessageBridge.getBrokerController().getMessageStoreConfig().getFileReservedTime() + * 3600L * 1000) { + log.info("Half message exceed file reserved time ,so skip it.messageId {},bornTime {}", + msgExt.getMsgId(), msgExt.getBornTimestamp()); + return true; + } + return false; + } + + private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) { + PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt); + if (putMessageResult != null + && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { + msgExt.setQueueOffset( + putMessageResult.getAppendMessageResult().getLogicsOffset()); + msgExt.setCommitLogOffset( + putMessageResult.getAppendMessageResult().getWroteOffset()); + msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); + log.info( + "Send check message, the offset={} restored in queueOffset={} " + + "commitLogOffset={} " + + "newMsgId={} realMsgId={} topic={}", + offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(), + msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), + msgExt.getTopic()); + return true; + } else { + log.error( + "PutBackToHalfQueueReturnResult write failed, topic: {}, queueId: {}, " + + "msgId: {}", + msgExt.getTopic(), msgExt.getQueueId(), msgExt.getMsgId()); + return false; + } + } + + @Override + public void check(long transactionTimeout, int transactionCheckMax, + AbstractTransactionalMessageCheckListener listener) { + try { + String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC; + Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); + if (msgQueues == null || msgQueues.size() == 0) { + log.warn("The queue of topic is empty :" + topic); + return; + } + log.info("Check topic={}, queues={}", topic, msgQueues); + for (MessageQueue messageQueue : msgQueues) { + long startTime = System.currentTimeMillis(); + MessageQueue opQueue = getOpQueue(messageQueue); + long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); + long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); + log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset); + if (halfOffset < 0 || opOffset < 0) { + log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, + halfOffset, opOffset); + continue; + } + + List<Long> doneOpOffset = new ArrayList<>(); + HashMap<Long, Long> removeMap = new HashMap<>(); + PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset); + if (null == pullResult) { + log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", + messageQueue, halfOffset, opOffset); + continue; + } + // single thread + int getMessageNullCount = 1; + long newOffset = halfOffset; + long i = halfOffset; + while (true) { + if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) { + log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); + break; + } + if (removeMap.containsKey(i)) { + log.info("Half offset {} has been committed/rolled back", i); + removeMap.remove(i); + } else { + GetResult getResult = getHalfMsg(messageQueue, i); + MessageExt msgExt = getResult.getMsg(); + if (msgExt == null) { + if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) { + break; + } + if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) { + log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, + messageQueue, getMessageNullCount, getResult.getPullResult()); + break; + } else { + log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", + i, messageQueue, getMessageNullCount, getResult.getPullResult()); + i = getResult.getPullResult().getNextBeginOffset(); + newOffset = i; + continue; + } + } + + if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { + listener.resolveDiscardMsg(msgExt); + newOffset = i + 1; + i++; + continue; + } + if (msgExt.getStoreTimestamp() >= startTime) { + log.info("Fresh stored. the miss offset={}, check it later, store={}", i, + new Date(msgExt.getStoreTimestamp())); + break; + } + + long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); + long checkImmunityTime = transactionTimeout; + String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); + if (null != checkImmunityTimeStr) { + checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); + if (valueOfCurrentMinusBorn < checkImmunityTime) { + if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) { + newOffset = i + 1; + i++; + continue; + } + } + } else { + if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) { + log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, + checkImmunityTime, new Date(msgExt.getBornTimestamp())); + break; + } + } + List<MessageExt> opMsg = pullResult.getMsgFoundList(); + boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) + || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) + || (valueOfCurrentMinusBorn <= -1); + + if (isNeedCheck) { + if (!putBackHalfMsgQueue(msgExt, i)) { + continue; + } + listener.resolveHalfMsg(msgExt); + } else { + pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); + log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, + messageQueue, pullResult); + continue; + } + } + newOffset = i + 1; + i++; + } + if (newOffset != halfOffset) { + transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); + } + long newOpOffset = calculateOpOffset(doneOpOffset, opOffset); + if (newOpOffset != opOffset) { + transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); + } + } + } catch (Exception e) { + e.printStackTrace(); + log.error("Check error", e); + } + + } + + private long getImmunityTime(String checkImmunityTimeStr, long transactionTimeout) { + long checkImmunityTime; + + checkImmunityTime = getLong(checkImmunityTimeStr); + if (-1 == checkImmunityTime) { + checkImmunityTime = transactionTimeout; + } else { + checkImmunityTime *= 1000; + } + return checkImmunityTime; + } + + /** + * Read op message, parse op message, and fill removeMap + * + * @param removeMap Half message to be remove, key:halfOffset, value: opOffset. + * @param opQueue Op message queue. + * @param pullOffsetOfOp The begin offset of op message queue. + * @param miniOffset The current minimum offset of half message queue. + * @param doneOpOffset Stored op messages that have been processed. + * @return Op message result. + */ + private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap, + MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) { + PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32); + if (null == pullResult) { + return null; + } + if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL + || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) { + log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue, + pullResult); + transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset()); + return pullResult; + } else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) { + log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue, + pullResult); + return pullResult; + } + List<MessageExt> opMsg = pullResult.getMsgFoundList(); + if (opMsg == null) { + log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult); + return pullResult; + } + for (MessageExt opMessageExt : opMsg) { + Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset)); + log.info("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(), + opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset); Review comment: queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset)); why this can get the value of HalfOffset? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
