GitHub user chenzhixiu edited a discussion: 会不会出现commitlog文件写消息时候, maxBlank 小于8,导致写入异常
`public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) { byteBuffer.mark(); //physical offset long wroteOffset = fileFromOffset + byteBuffer.position(); // Record ConsumeQueue information Long queueOffset = messageExtBatch.getQueueOffset(); long beginQueueOffset = queueOffset; int totalMsgLen = 0; int msgNum = 0; final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff(); int sysFlag = messageExtBatch.getSysFlag(); int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; Supplier<String> msgIdSupplier = () -> { int msgIdLen = storeHostLength + 8; int batchCount = putMessageContext.getBatchSize(); long[] phyPosArray = putMessageContext.getPhyPos(); ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen); MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer); msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer StringBuilder buffer = new StringBuilder(batchCount * msgIdLen * 2 + batchCount - 1); for (int i = 0; i < phyPosArray.length; i++) { msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]); String msgId = UtilAll.bytes2string(msgIdBuffer.array()); if (i != 0) { buffer.append(','); } buffer.append(msgId); } return buffer.toString(); }; messagesByteBuff.mark(); int index = 0; while (messagesByteBuff.hasRemaining()) { // 1 TOTALSIZE final int msgPos = messagesByteBuff.position(); final int msgLen = messagesByteBuff.getInt(); totalMsgLen += msgLen; // Determines whether there is sufficient free space if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.msgStoreItemMemory.clear(); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value //ignore previous read messagesByteBuff.reset(); // Here the length of the specially set maxBlank byteBuffer.reset(); //ignore the previous appended messages byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } //move to add queue offset and commitlog offset int pos = msgPos + 20; messagesByteBuff.putLong(pos, queueOffset); pos += 8; messagesByteBuff.putLong(pos, wroteOffset + totalMsgLen - msgLen); // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP pos += 8 + 4 + 8 + bornHostLength; // refresh store time stamp in lock messagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp()); putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen; queueOffset++; msgNum++; messagesByteBuff.position(msgPos + msgLen); } messagesByteBuff.position(0); messagesByteBuff.limit(totalMsgLen); byteBuffer.put(messagesByteBuff); messageExtBatch.setEncodedBuff(null); AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdSupplier, messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); result.setMsgNum(msgNum); return result; } }` GitHub link: https://github.com/apache/rocketmq/discussions/7043 ---- This is an automatically sent email for dev@rocketmq.apache.org. To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org