http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java new file mode 100644 index 0000000..60b3fff --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.stats; + +import com.alibaba.rocketmq.common.ThreadFactoryImpl; +import com.alibaba.rocketmq.common.constant.LoggerName; +import com.alibaba.rocketmq.common.stats.MomentStatsItemSet; +import com.alibaba.rocketmq.common.stats.StatsItem; +import com.alibaba.rocketmq.common.stats.StatsItemSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + + +public class BrokerStatsManager { + + public static final String TOPIC_PUT_NUMS = "TOPIC_PUT_NUMS"; + public static final String TOPIC_PUT_SIZE = "TOPIC_PUT_SIZE"; + public static final String GROUP_GET_NUMS = "GROUP_GET_NUMS"; + public static final String GROUP_GET_SIZE = "GROUP_GET_SIZE"; + public static final String SNDBCK_PUT_NUMS = "SNDBCK_PUT_NUMS"; + public static final String BROKER_PUT_NUMS = "BROKER_PUT_NUMS"; + public static final String BROKER_GET_NUMS = "BROKER_GET_NUMS"; + public static final String GROUP_GET_FROM_DISK_NUMS = "GROUP_GET_FROM_DISK_NUMS"; + public static final String GROUP_GET_FROM_DISK_SIZE = "GROUP_GET_FROM_DISK_SIZE"; + public static final String BROKER_GET_FROM_DISK_NUMS = "BROKER_GET_FROM_DISK_NUMS"; + public static final String BROKER_GET_FROM_DISK_SIZE = "BROKER_GET_FROM_DISK_SIZE"; + // For commercial + public static final String COMMERCIAL_SEND_TIMES = "COMMERCIAL_SEND_TIMES"; + public static final String COMMERCIAL_SNDBCK_TIMES = "COMMERCIAL_SNDBCK_TIMES"; + public static final String COMMERCIAL_RCV_TIMES = "COMMERCIAL_RCV_TIMES"; + public static final String COMMERCIAL_RCV_EPOLLS = "COMMERCIAL_RCV_EPOLLS"; + public static final String COMMERCIAL_SEND_SIZE = "COMMERCIAL_SEND_SIZE"; + public static final String COMMERCIAL_RCV_SIZE = "COMMERCIAL_RCV_SIZE"; + public static final String COMMERCIAL_PERM_FAILURES = "COMMERCIAL_PERM_FAILURES"; + public static final String COMMERCIAL_OWNER = "Owner"; + // Message Size limit for one api-calling count. + public static final double SIZE_PER_COUNT = 64 * 1024; + + public static final String GROUP_GET_FALL_SIZE = "GROUP_GET_FALL_SIZE"; + public static final String GROUP_GET_FALL_TIME = "GROUP_GET_FALL_TIME"; + // Pull Message Latency + public static final String GROUP_GET_LATENCY = "GROUP_GET_LATENCY"; + + /** + * read disk follow stats + */ + private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_STATS_LOGGER_NAME); + private static final Logger COMMERCIAL_LOG = LoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "BrokerStatsThread")); + private final ScheduledExecutorService commercialExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "CommercialStatsThread")); + private final HashMap<String, StatsItemSet> statsTable = new HashMap<String, StatsItemSet>(); + private final String clusterName; + private final MomentStatsItemSet momentStatsItemSetFallSize = new MomentStatsItemSet(GROUP_GET_FALL_SIZE, scheduledExecutorService, log); + private final MomentStatsItemSet momentStatsItemSetFallTime = new MomentStatsItemSet(GROUP_GET_FALL_TIME, scheduledExecutorService, log); + + public BrokerStatsManager(String clusterName) { + this.clusterName = clusterName; + + this.statsTable.put(TOPIC_PUT_NUMS, new StatsItemSet(TOPIC_PUT_NUMS, this.scheduledExecutorService, log)); + this.statsTable.put(TOPIC_PUT_SIZE, new StatsItemSet(TOPIC_PUT_SIZE, this.scheduledExecutorService, log)); + this.statsTable.put(GROUP_GET_NUMS, new StatsItemSet(GROUP_GET_NUMS, this.scheduledExecutorService, log)); + this.statsTable.put(GROUP_GET_SIZE, new StatsItemSet(GROUP_GET_SIZE, this.scheduledExecutorService, log)); + this.statsTable.put(GROUP_GET_LATENCY, new StatsItemSet(GROUP_GET_LATENCY, this.scheduledExecutorService, log)); + this.statsTable.put(SNDBCK_PUT_NUMS, new StatsItemSet(SNDBCK_PUT_NUMS, this.scheduledExecutorService, log)); + this.statsTable.put(BROKER_PUT_NUMS, new StatsItemSet(BROKER_PUT_NUMS, this.scheduledExecutorService, log)); + this.statsTable.put(BROKER_GET_NUMS, new StatsItemSet(BROKER_GET_NUMS, this.scheduledExecutorService, log)); + this.statsTable.put(GROUP_GET_FROM_DISK_NUMS, new StatsItemSet(GROUP_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log)); + this.statsTable.put(GROUP_GET_FROM_DISK_SIZE, new StatsItemSet(GROUP_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log)); + this.statsTable.put(BROKER_GET_FROM_DISK_NUMS, new StatsItemSet(BROKER_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log)); + this.statsTable.put(BROKER_GET_FROM_DISK_SIZE, new StatsItemSet(BROKER_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log)); + + + this.statsTable.put(COMMERCIAL_SEND_TIMES, new StatsItemSet(COMMERCIAL_SEND_TIMES, this.commercialExecutor, COMMERCIAL_LOG)); + this.statsTable.put(COMMERCIAL_RCV_TIMES, new StatsItemSet(COMMERCIAL_RCV_TIMES, this.commercialExecutor, COMMERCIAL_LOG)); + this.statsTable.put(COMMERCIAL_SEND_SIZE, new StatsItemSet(COMMERCIAL_SEND_SIZE, this.commercialExecutor, COMMERCIAL_LOG)); + this.statsTable.put(COMMERCIAL_RCV_SIZE, new StatsItemSet(COMMERCIAL_RCV_SIZE, this.commercialExecutor, COMMERCIAL_LOG)); + this.statsTable.put(COMMERCIAL_RCV_EPOLLS, new StatsItemSet(COMMERCIAL_RCV_EPOLLS, this.commercialExecutor, COMMERCIAL_LOG)); + this.statsTable.put(COMMERCIAL_SNDBCK_TIMES, new StatsItemSet(COMMERCIAL_SNDBCK_TIMES, this.commercialExecutor, COMMERCIAL_LOG)); + this.statsTable.put(COMMERCIAL_PERM_FAILURES, new StatsItemSet(COMMERCIAL_PERM_FAILURES, this.commercialExecutor, COMMERCIAL_LOG)); + } + + public MomentStatsItemSet getMomentStatsItemSetFallSize() { + return momentStatsItemSetFallSize; + } + + public MomentStatsItemSet getMomentStatsItemSetFallTime() { + return momentStatsItemSetFallTime; + } + + public void start() { + } + + public void shutdown() { + this.scheduledExecutorService.shutdown(); + } + + public StatsItem getStatsItem(final String statsName, final String statsKey) { + try { + return this.statsTable.get(statsName).getStatsItem(statsKey); + } catch (Exception e) { + } + + return null; + } + + public void incTopicPutNums(final String topic) { + this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1); + } + + public void incTopicPutSize(final String topic, final int size) { + this.statsTable.get(TOPIC_PUT_SIZE).addValue(topic, size, 1); + } + + public void incGroupGetNums(final String group, final String topic, final int incValue) { + final String statsKey = buildStatsKey(topic, group); + this.statsTable.get(GROUP_GET_NUMS).addValue(statsKey, incValue, 1); + } + + public String buildStatsKey(String topic, String group) { + StringBuffer strBuilder = new StringBuffer(); + strBuilder.append(topic); + strBuilder.append("@"); + strBuilder.append(group); + return strBuilder.toString(); + } + + public void incGroupGetSize(final String group, final String topic, final int incValue) { + final String statsKey = buildStatsKey(topic, group); + this.statsTable.get(GROUP_GET_SIZE).addValue(statsKey, incValue, 1); + } + + public void incGroupGetLatency(final String group, final String topic, final int queueId, final int incValue) { + final String statsKey = String.format("%d@%s@%s", queueId, topic, group); + this.statsTable.get(GROUP_GET_LATENCY).addValue(statsKey, incValue, 1); + } + + + public void incBrokerPutNums() { + this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet(); + } + + + public void incBrokerGetNums(final int incValue) { + this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); + } + + + public void incSendBackNums(final String group, final String topic) { + final String statsKey = buildStatsKey(topic, group); + this.statsTable.get(SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1); + } + + + public double tpsGroupGetNums(final String group, final String topic) { + final String statsKey = buildStatsKey(topic, group); + return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps(); + } + + + public void recordDiskFallBehindTime(final String group, final String topic, final int queueId, final long fallBehind) { + final String statsKey = String.format("%d@%s@%s", queueId, topic, group); + this.momentStatsItemSetFallTime.getAndCreateStatsItem(statsKey).getValue().set(fallBehind); + } + + + public void recordDiskFallBehindSize(final String group, final String topic, final int queueId, final long fallBehind) { + final String statsKey = String.format("%d@%s@%s", queueId, topic, group); + this.momentStatsItemSetFallSize.getAndCreateStatsItem(statsKey).getValue().set(fallBehind); + } + + public void incCommercialValue(final String key, final String owner, final String group, + final String topic, final String type, final int incValue) { + final String statsKey = buildCommercialStatsKey(owner, topic, group, type); + this.statsTable.get(key).addValue(statsKey, incValue, 1); + } + + public String buildCommercialStatsKey(String owner, String topic, String group, String type) { + StringBuffer strBuilder = new StringBuffer(); + strBuilder.append(owner); + strBuilder.append("@"); + strBuilder.append(topic); + strBuilder.append("@"); + strBuilder.append(group); + strBuilder.append("@"); + strBuilder.append(type); + return strBuilder.toString(); + } + + + public enum StatsType { + SEND_SUCCESS, + SEND_FAILURE, + SEND_BACK, + SEND_TIMER, + SEND_TRANSACTION, + RCV_SUCCESS, + RCV_EPOLLS, + PERM_FAILURE + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/util/LibC.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/util/LibC.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/util/LibC.java new file mode 100644 index 0000000..7a3b37c --- /dev/null +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/util/LibC.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.store.util; + +import com.sun.jna.Library; +import com.sun.jna.Native; +import com.sun.jna.NativeLong; +import com.sun.jna.Platform; +import com.sun.jna.Pointer; + + +public interface LibC extends Library { + LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class); + + int MADV_WILLNEED = 3; + int MADV_DONTNEED = 4; + + int MCL_CURRENT = 1; + int MCL_FUTURE = 2; + int MCL_ONFAULT = 4; + + /* sync memory asynchronously */ + int MS_ASYNC = 0x0001; + /* invalidate mappings & caches */ + int MS_INVALIDATE = 0x0002; + /* synchronous memory sync */ + int MS_SYNC = 0x0004; + + int mlock(Pointer var1, NativeLong var2); + + int munlock(Pointer var1, NativeLong var2); + + int madvise(Pointer var1, NativeLong var2, int var3); + + Pointer memset(Pointer p, int v, long len); + + int mlockall(int flags); + + int msync(Pointer p, NativeLong length, int flags); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java new file mode 100644 index 0000000..ac577c7 --- /dev/null +++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.store.config.FlushDiskType; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertTrue; + + +/** + * @author shijia.wxr + */ +public class DefaultMessageStoreTest { + private static final String StoreMessage = "Once, there was a chance for me!"; + + private static int QUEUE_TOTAL = 100; + + private static AtomicInteger QueueId = new AtomicInteger(0); + + private static SocketAddress BornHost; + + private static SocketAddress StoreHost; + + private static byte[] MessageBody; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); + BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Test + public void test_write_read() throws Exception { + System.out.println("================================================================"); + long totalMsgs = 100; + QUEUE_TOTAL = 1; + MessageBody = StoreMessage.getBytes(); + + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8); + messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); + MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null); + + boolean load = master.load(); + assertTrue(load); + + master.start(); + for (long i = 0; i < totalMsgs; i++) { + PutMessageResult result = master.putMessage(buildMessage()); + System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId()); + } + + for (long i = 0; i < totalMsgs; i++) { + try { + GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); + if (result == null) { + System.out.println("result == null " + i); + } + assertTrue(result != null); + result.release(); + System.out.println("read " + i + " OK"); + } catch (Exception e) { + e.printStackTrace(); + } + + } + master.shutdown(); + master.destroy(); + System.out.println("================================================================"); + } + + public MessageExtBrokerInner buildMessage() { + MessageExtBrokerInner msg = new MessageExtBrokerInner(); + msg.setTopic("AAA"); + msg.setTags("TAG1"); + msg.setKeys("Hello"); + msg.setBody(MessageBody); + msg.setKeys(String.valueOf(System.currentTimeMillis())); + msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); + msg.setSysFlag(4); + msg.setBornTimestamp(System.currentTimeMillis()); + msg.setStoreHost(StoreHost); + msg.setBornHost(BornHost); + return msg; + } + + @Test + public void test_group_commit() throws Exception { + System.out.println("================================================================"); + long totalMsgs = 100; + QUEUE_TOTAL = 1; + MessageBody = StoreMessage.getBytes(); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8); + messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); + MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null); + boolean load = master.load(); + assertTrue(load); + + master.start(); + for (long i = 0; i < totalMsgs; i++) { + PutMessageResult result = master.putMessage(buildMessage()); + System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId()); + } + + for (long i = 0; i < totalMsgs; i++) { + try { + GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); + if (result == null) { + System.out.println("result == null " + i); + } + assertTrue(result != null); + result.release(); + System.out.println("read " + i + " OK"); + } catch (Exception e) { + e.printStackTrace(); + } + + } + master.shutdown(); + master.destroy(); + System.out.println("================================================================"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java new file mode 100644 index 0000000..ce7666f --- /dev/null +++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: MappedFileQueueTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.store; + +import org.junit.*; + +import static org.junit.Assert.*; + + +public class MappedFileQueueTest { + + // private static final String StoreMessage = + // "Once, there was a chance for me! but I did not treasure it. if"; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void test_getLastMapedFile() { + final String fixedMsg = "0123456789abcdef"; + System.out.println("================================================================"); + MappedFileQueue mappedFileQueue = + new MappedFileQueue("target/unit_test_store/a/", 1024, null); + + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); + assertTrue(mappedFile != null); + boolean result = mappedFile.appendMessage(fixedMsg.getBytes()); + if (!result) { + System.out.println("appendMessage " + i); + } + assertTrue(result); + } + + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + System.out.println("MappedFileQueue.getLastMappedFile() OK"); + } + + + @Test + public void test_findMapedFileByOffset() { + final String fixedMsg = "abcd"; + System.out.println("================================================================"); + MappedFileQueue mappedFileQueue = + new MappedFileQueue("target/unit_test_store/b/", 1024, null); + + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); + assertTrue(mappedFile != null); + boolean result = mappedFile.appendMessage(fixedMsg.getBytes()); + // System.out.println("appendMessage " + bytes); + assertTrue(result); + } + + MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(0); + assertTrue(mappedFile != null); + assertEquals(mappedFile.getFileFromOffset(), 0); + System.out.println(mappedFile.getFileFromOffset()); + + mappedFile = mappedFileQueue.findMappedFileByOffset(100); + assertTrue(mappedFile != null); + assertEquals(mappedFile.getFileFromOffset(), 0); + System.out.println(mappedFile.getFileFromOffset()); + + mappedFile = mappedFileQueue.findMappedFileByOffset(1024); + assertTrue(mappedFile != null); + assertEquals(mappedFile.getFileFromOffset(), 1024); + System.out.println(mappedFile.getFileFromOffset()); + + mappedFile = mappedFileQueue.findMappedFileByOffset(1024 + 100); + assertTrue(mappedFile != null); + assertEquals(mappedFile.getFileFromOffset(), 1024); + System.out.println(mappedFile.getFileFromOffset()); + + mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2); + assertTrue(mappedFile != null); + assertEquals(mappedFile.getFileFromOffset(), 1024 * 2); + System.out.println(mappedFile.getFileFromOffset()); + + mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2 + 100); + assertTrue(mappedFile != null); + assertEquals(mappedFile.getFileFromOffset(), 1024 * 2); + System.out.println(mappedFile.getFileFromOffset()); + + mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4); + assertTrue(mappedFile == null); + + mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4 + 100); + assertTrue(mappedFile == null); + + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + System.out.println("MappedFileQueue.findMappedFileByOffset() OK"); + } + + @Test + public void test_commit() { + final String fixedMsg = "0123456789abcdef"; + System.out.println("================================================================"); + MappedFileQueue mappedFileQueue = + new MappedFileQueue("target/unit_test_store/c/", 1024, null); + + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); + assertTrue(mappedFile != null); + boolean result = mappedFile.appendMessage(fixedMsg.getBytes()); + assertTrue(result); + } + + boolean result = mappedFileQueue.flush(0); + assertFalse(result); + assertEquals(1024 * 1, mappedFileQueue.getFlushedWhere()); + System.out.println("1 " + result + " " + mappedFileQueue.getFlushedWhere()); + + result = mappedFileQueue.flush(0); + assertFalse(result); + assertEquals(1024 * 2, mappedFileQueue.getFlushedWhere()); + System.out.println("2 " + result + " " + mappedFileQueue.getFlushedWhere()); + + result = mappedFileQueue.flush(0); + assertFalse(result); + assertEquals(1024 * 3, mappedFileQueue.getFlushedWhere()); + System.out.println("3 " + result + " " + mappedFileQueue.getFlushedWhere()); + + result = mappedFileQueue.flush(0); + assertFalse(result); + assertEquals(1024 * 4, mappedFileQueue.getFlushedWhere()); + System.out.println("4 " + result + " " + mappedFileQueue.getFlushedWhere()); + + result = mappedFileQueue.flush(0); + assertFalse(result); + assertEquals(1024 * 5, mappedFileQueue.getFlushedWhere()); + System.out.println("5 " + result + " " + mappedFileQueue.getFlushedWhere()); + + result = mappedFileQueue.flush(0); + assertFalse(result); + assertEquals(1024 * 6, mappedFileQueue.getFlushedWhere()); + System.out.println("6 " + result + " " + mappedFileQueue.getFlushedWhere()); + + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + System.out.println("MappedFileQueue.flush() OK"); + } + + @Test + public void test_getMapedMemorySize() { + final String fixedMsg = "abcd"; + System.out.println("================================================================"); + MappedFileQueue mappedFileQueue = + new MappedFileQueue("target/unit_test_store/d/", 1024, null); + + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); + assertTrue(mappedFile != null); + boolean result = mappedFile.appendMessage(fixedMsg.getBytes()); + assertTrue(result); + } + + assertEquals(fixedMsg.length() * 1024, mappedFileQueue.getMappedMemorySize()); + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + System.out.println("MappedFileQueue.getMappedMemorySize() OK"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileTest.java new file mode 100644 index 0000000..94fd5ee --- /dev/null +++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: MappedFileTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.store; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertTrue; + + +public class MappedFileTest { + + private static final String StoreMessage = "Once, there was a chance for me!"; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Test + public void test_write_read() { + try { + MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/000", 1024 * 64); + boolean result = mappedFile.appendMessage(StoreMessage.getBytes()); + assertTrue(result); + System.out.println("write OK"); + + SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0); + byte[] data = new byte[StoreMessage.length()]; + selectMappedBufferResult.getByteBuffer().get(data); + String readString = new String(data); + + System.out.println("Read: " + readString); + assertTrue(readString.equals(StoreMessage)); + + mappedFile.shutdown(1000); + assertTrue(!mappedFile.isAvailable()); + selectMappedBufferResult.release(); + assertTrue(mappedFile.isCleanupOver()); + assertTrue(mappedFile.destroy(1000)); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Ignore + public void test_jvm_crashed() { + try { + MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/10086", 1024 * 64); + boolean result = mappedFile.appendMessage(StoreMessage.getBytes()); + assertTrue(result); + System.out.println("write OK"); + + SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0); + selectMappedBufferResult.release(); + mappedFile.shutdown(1000); + + byte[] data = new byte[StoreMessage.length()]; + selectMappedBufferResult.getByteBuffer().get(data); + String readString = new String(data); + System.out.println(readString); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java new file mode 100644 index 0000000..ea83375 --- /dev/null +++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: RecoverTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.store; + +import com.alibaba.rocketmq.common.message.MessageDecoder; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertTrue; + + +public class RecoverTest { + private static final String StoreMessage = "Once, there was a chance for me!aaaaaaaaaaaaaaaaaaaaaaaa"; + + private static int QUEUE_TOTAL = 10; + + private static AtomicInteger QueueId = new AtomicInteger(0); + + private static SocketAddress BornHost; + + private static SocketAddress StoreHost; + + private static byte[] MessageBody; + private MessageStore storeWrite1; + private MessageStore storeWrite2; + private MessageStore storeRead; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); + BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Test + public void test_recover_normally() throws Exception { + this.writeMessage(true, true); + Thread.sleep(1000 * 3); + this.readMessage(1000); + this.destroy(); + } + + public void writeMessage(boolean normal, boolean first) throws Exception { + System.out.println("================================================================"); + long totalMsgs = 100; + QUEUE_TOTAL = 3; + + MessageBody = StoreMessage.getBytes(); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32); + messageStoreConfig.setMapedFileSizeConsumeQueue(100 * 20); + messageStoreConfig.setMessageIndexEnable(false); + + MessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, null); + if (first) { + this.storeWrite1 = messageStore; + } else { + this.storeWrite2 = messageStore; + } + + boolean loadResult = messageStore.load(); + assertTrue(loadResult); + messageStore.start(); + for (long i = 0; i < totalMsgs; i++) { + PutMessageResult result = messageStore.putMessage(buildMessage()); + System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId()); + } + + if (normal) { + messageStore.shutdown(); + } + System.out.println("========================writeMessage OK========================================"); + } + + public void readMessage(final long msgCnt) throws Exception { + System.out.println("================================================================"); + QUEUE_TOTAL = 3; + MessageBody = StoreMessage.getBytes(); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32); + messageStoreConfig.setMapedFileSizeConsumeQueue(100 * 20); + messageStoreConfig.setMessageIndexEnable(false); + storeRead = new DefaultMessageStore(messageStoreConfig, null, null, null); + boolean loadResult = storeRead.load(); + assertTrue(loadResult); + storeRead.start(); + + long readCnt = 0; + for (int queueId = 0; queueId < QUEUE_TOTAL; queueId++) { + for (long offset = 0; ; ) { + GetMessageResult result = storeRead.getMessage("GROUP_A", "TOPIC_A", queueId, offset, 1024 * 1024, null); + if (result.getStatus() == GetMessageStatus.FOUND) { + System.out.println(queueId + "\t" + result.getMessageCount()); + this.veryReadMessage(queueId, offset, result.getMessageBufferList()); + offset += result.getMessageCount(); + readCnt += result.getMessageCount(); + result.release(); + } else { + break; + } + } + } + + System.out.println("readCnt = " + readCnt); + assertTrue(readCnt == msgCnt); + System.out.println("========================readMessage OK========================================"); + } + + private void destroy() { + if (storeWrite1 != null) { + storeWrite1.shutdown(); + storeWrite1.destroy(); + } + + if (storeWrite2 != null) { + storeWrite2.shutdown(); + storeWrite2.destroy(); + } + + if (storeRead != null) { + storeRead.shutdown(); + storeRead.destroy(); + } + } + + public MessageExtBrokerInner buildMessage() { + MessageExtBrokerInner msg = new MessageExtBrokerInner(); + msg.setTopic("TOPIC_A"); + msg.setTags("TAG1"); + msg.setKeys("Hello"); + msg.setBody(MessageBody); + msg.setKeys(String.valueOf(System.currentTimeMillis())); + msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); + msg.setSysFlag(4); + msg.setBornTimestamp(System.currentTimeMillis()); + msg.setStoreHost(StoreHost); + msg.setBornHost(BornHost); + + return msg; + } + + private void veryReadMessage(int queueId, long queueOffset, List<ByteBuffer> byteBuffers) { + for (ByteBuffer byteBuffer : byteBuffers) { + MessageExt msg = MessageDecoder.decode(byteBuffer); + System.out.println("request queueId " + queueId + ", request queueOffset " + queueOffset + " msg queue offset " + + msg.getQueueOffset()); + + assertTrue(msg.getQueueOffset() == queueOffset); + + queueOffset++; + } + } + + @Test + public void test_recover_normally_write() throws Exception { + this.writeMessage(true, true); + Thread.sleep(1000 * 3); + this.writeMessage(true, false); + Thread.sleep(1000 * 3); + this.readMessage(2000); + this.destroy(); + } + + @Test + public void test_recover_abnormally() throws Exception { + this.writeMessage(false, true); + Thread.sleep(1000 * 3); + this.readMessage(1000); + this.destroy(); + } + + @Test + public void test_recover_abnormally_write() throws Exception { + this.writeMessage(false, true); + Thread.sleep(1000 * 3); + this.writeMessage(false, false); + Thread.sleep(1000 * 3); + this.readMessage(2000); + this.destroy(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/StoreCheckpointTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/StoreCheckpointTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/StoreCheckpointTest.java new file mode 100644 index 0000000..e0a550d --- /dev/null +++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/StoreCheckpointTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: StoreCheckpointTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.store; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + + +public class StoreCheckpointTest { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Test + public void test_write_read() { + try { + StoreCheckpoint storeCheckpoint = new StoreCheckpoint("target/checkpoint_test/0000"); + long physicMsgTimestamp = 0xAABB; + long logicsMsgTimestamp = 0xCCDD; + storeCheckpoint.setPhysicMsgTimestamp(physicMsgTimestamp); + storeCheckpoint.setLogicsMsgTimestamp(logicsMsgTimestamp); + storeCheckpoint.flush(); + + long diff = physicMsgTimestamp - storeCheckpoint.getMinTimestamp(); + assertTrue(diff == 3000); + storeCheckpoint.shutdown(); + storeCheckpoint = new StoreCheckpoint("target/checkpoint_test/0000"); + assertTrue(physicMsgTimestamp == storeCheckpoint.getPhysicMsgTimestamp()); + assertTrue(logicsMsgTimestamp == storeCheckpoint.getLogicsMsgTimestamp()); + } catch (Throwable e) { + e.printStackTrace(); + assertTrue(false); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java new file mode 100644 index 0000000..288b87e --- /dev/null +++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: IndexFileTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.store.index; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + + +public class IndexFileTest { + private static final int hashSlotNum = 100; + private static final int indexNum = 400; + + @Test + public void test_put_index() { + try { + IndexFile indexFile = new IndexFile("100", hashSlotNum, indexNum, 0, 0); + for (long i = 0; i < (indexNum - 1); i++) { + boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis()); + assertTrue(putResult); + } + + boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis()); + assertFalse(putResult); + + indexFile.destroy(0); + } catch (Exception e) { + e.printStackTrace(); + assertTrue(false); + } + } + + + @Test + public void test_put_get_index() { + try { + IndexFile indexFile = new IndexFile("200", hashSlotNum, indexNum, 0, 0); + + for (long i = 0; i < (indexNum - 1); i++) { + boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis()); + assertTrue(putResult); + } + boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis()); + assertFalse(putResult); + + final List<Long> phyOffsets = new ArrayList<Long>(); + indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true); + for (Long offset : phyOffsets) { + System.out.println(offset); + } + assertFalse(phyOffsets.isEmpty()); + indexFile.destroy(0); + } catch (Exception e) { + e.printStackTrace(); + assertTrue(false); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java new file mode 100644 index 0000000..d7de738 --- /dev/null +++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * $Id: ScheduleMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ + */ +package com.alibaba.rocketmq.store.schedule; + +import com.alibaba.rocketmq.store.*; +import com.alibaba.rocketmq.store.config.MessageStoreConfig; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertTrue; + +@Ignore +public class ScheduleMessageTest { + private static final String StoreMessage = "Once, there was a chance for me!"; + + private static int QUEUE_TOTAL = 100; + + private static AtomicInteger QueueId = new AtomicInteger(0); + + private static SocketAddress BornHost; + + private static SocketAddress StoreHost; + + private static byte[] MessageBody; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); + BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Test + public void test_delay_message() throws Exception { + System.out.println("================================================================"); + long totalMsgs = 10000; + QUEUE_TOTAL = 32; + + + MessageBody = StoreMessage.getBytes(); + + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32); + messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 16); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(1000 * 10); + + MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null); + + boolean load = master.load(); + assertTrue(load); + + + master.start(); + for (int i = 0; i < totalMsgs; i++) { + MessageExtBrokerInner msg = buildMessage(); + msg.setDelayTimeLevel(i % 4); + + PutMessageResult result = master.putMessage(msg); + System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId()); + } + + System.out.println("write message over, wait time up"); + Thread.sleep(1000 * 20); + + + for (long i = 0; i < totalMsgs; i++) { + try { + GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null); + if (result == null) { + System.out.println("result == null " + i); + } + assertTrue(result != null); + result.release(); + System.out.println("read " + i + " OK"); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + Thread.sleep(1000 * 15); + + + master.shutdown(); + + + master.destroy(); + System.out.println("================================================================"); + } + + public MessageExtBrokerInner buildMessage() { + MessageExtBrokerInner msg = new MessageExtBrokerInner(); + msg.setTopic("AAA"); + msg.setTags("TAG1"); + msg.setKeys("Hello"); + msg.setBody(MessageBody); + msg.setKeys(String.valueOf(System.currentTimeMillis())); + msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); + msg.setSysFlag(4); + msg.setBornTimestamp(System.currentTimeMillis()); + msg.setStoreHost(StoreHost); + msg.setBornHost(BornHost); + + return msg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/pom.xml ---------------------------------------------------------------------- diff --git a/rocketmq-tools/pom.xml b/rocketmq-tools/pom.xml new file mode 100644 index 0000000..5070a68 --- /dev/null +++ b/rocketmq-tools/pom.xml @@ -0,0 +1,66 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain producerGroup copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>com.alibaba.rocketmq</groupId> + <artifactId>rocketmq-all</artifactId> + <version>4.0.0-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <packaging>jar</packaging> + <artifactId>rocketmq-tools</artifactId> + <name>rocketmq-tools ${project.version}</name> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-client</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-store</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>rocketmq-srvutil</artifactId> + </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java ---------------------------------------------------------------------- diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java new file mode 100644 index 0000000..4576886 --- /dev/null +++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -0,0 +1,458 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.tools.admin; + +import com.alibaba.rocketmq.client.ClientConfig; +import com.alibaba.rocketmq.client.QueryResult; +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.TopicConfig; +import com.alibaba.rocketmq.common.admin.ConsumeStats; +import com.alibaba.rocketmq.common.admin.RollbackStats; +import com.alibaba.rocketmq.common.admin.TopicStatsTable; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.body.*; +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.exception.*; +import com.alibaba.rocketmq.tools.admin.api.MessageTrack; + +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { + private final DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private String adminExtGroup = "admin_ext_group"; + private String createTopicKey = MixAll.DEFAULT_TOPIC; + private long timeoutMillis = 5000; + + public DefaultMQAdminExt() { + this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, null, timeoutMillis); + } + + public DefaultMQAdminExt(long timeoutMillis) { + this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, null, timeoutMillis); + } + + public DefaultMQAdminExt(RPCHook rpcHook) { + this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, rpcHook, timeoutMillis); + } + + public DefaultMQAdminExt(RPCHook rpcHook, long timeoutMillis) { + this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, rpcHook, timeoutMillis); + } + + public DefaultMQAdminExt(final String adminExtGroup) { + this.adminExtGroup = adminExtGroup; + this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, timeoutMillis); + } + + public DefaultMQAdminExt(final String adminExtGroup, long timeoutMillis) { + this.adminExtGroup = adminExtGroup; + this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, timeoutMillis); + } + + @Override + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + createTopic(key, newTopic, queueNum, 0); + } + + @Override + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { + defaultMQAdminExtImpl.createTopic(key, newTopic, queueNum, topicSysFlag); + } + + @Override + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return defaultMQAdminExtImpl.searchOffset(mq, timestamp); + } + + @Override + public long maxOffset(MessageQueue mq) throws MQClientException { + return defaultMQAdminExtImpl.maxOffset(mq); + } + + + @Override + public long minOffset(MessageQueue mq) throws MQClientException { + return defaultMQAdminExtImpl.minOffset(mq); + } + + @Override + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + return defaultMQAdminExtImpl.earliestMsgStoreTime(mq); + } + + @Override + public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return defaultMQAdminExtImpl.viewMessage(offsetMsgId); + } + + @Override + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, + InterruptedException { + return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin, end); + } + + @Override + public void start() throws MQClientException { + defaultMQAdminExtImpl.start(); + } + + @Override + public void shutdown() { + defaultMQAdminExtImpl.shutdown(); + } + + @Override + public void updateBrokerConfig(String brokerAddr, Properties properties) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException { + defaultMQAdminExtImpl.updateBrokerConfig(brokerAddr, properties); + } + + @Override + public Properties getBrokerConfig(final String brokerAddr) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException { + return defaultMQAdminExtImpl.getBrokerConfig(brokerAddr); + } + + @Override + public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException { + defaultMQAdminExtImpl.createAndUpdateTopicConfig(addr, config); + } + + @Override + public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException { + defaultMQAdminExtImpl.createAndUpdateSubscriptionGroupConfig(addr, config); + } + + @Override + public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) { + return defaultMQAdminExtImpl.examineSubscriptionGroupConfig(addr, group); + } + + @Override + public TopicConfig examineTopicConfig(String addr, String topic) { + return defaultMQAdminExtImpl.examineTopicConfig(addr, topic); + } + + @Override + public TopicStatsTable examineTopicStats(String topic) throws RemotingException, MQClientException, InterruptedException, + MQBrokerException { + return defaultMQAdminExtImpl.examineTopicStats(topic); + } + + @Override + public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException { + return this.defaultMQAdminExtImpl.fetchAllTopicList(); + } + + @Override + public TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException { + return this.defaultMQAdminExtImpl.fetchTopicsByCLuster(clusterName); + } + + @Override + public KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, InterruptedException, MQBrokerException { + return this.defaultMQAdminExtImpl.fetchBrokerRuntimeStats(brokerAddr); + } + + @Override + public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException, MQClientException, InterruptedException, + MQBrokerException { + return examineConsumeStats(consumerGroup, null); + } + + @Override + public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException, + InterruptedException, MQBrokerException { + return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic); + } + + @Override + public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, RemotingConnectException, RemotingTimeoutException, + RemotingSendRequestException, MQBrokerException { + return defaultMQAdminExtImpl.examineBrokerClusterInfo(); + } + + @Override + public TopicRouteData examineTopicRouteInfo(String topic) throws RemotingException, MQClientException, InterruptedException { + return defaultMQAdminExtImpl.examineTopicRouteInfo(topic); + } + + @Override + public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup) throws InterruptedException, MQBrokerException, + RemotingException, MQClientException { + return defaultMQAdminExtImpl.examineConsumerConnectionInfo(consumerGroup); + } + + @Override + public ProducerConnection examineProducerConnectionInfo(String producerGroup, final String topic) throws RemotingException, + MQClientException, InterruptedException, MQBrokerException { + return defaultMQAdminExtImpl.examineProducerConnectionInfo(producerGroup, topic); + } + + @Override + public List<String> getNameServerAddressList() { + return this.defaultMQAdminExtImpl.getNameServerAddressList(); + } + + @Override + public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException, + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { + return defaultMQAdminExtImpl.wipeWritePermOfBroker(namesrvAddr, brokerName); + } + + @Override + public void putKVConfig(String namespace, String key, String value) { + defaultMQAdminExtImpl.putKVConfig(namespace, key, value); + } + + @Override + public String getKVConfig(String namespace, String key) throws RemotingException, MQClientException, InterruptedException { + return defaultMQAdminExtImpl.getKVConfig(namespace, key); + } + + @Override + public KVTable getKVListByNamespace(String namespace) throws RemotingException, MQClientException, InterruptedException { + return defaultMQAdminExtImpl.getKVListByNamespace(namespace); + } + + @Override + public void deleteTopicInBroker(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException, + MQClientException { + defaultMQAdminExtImpl.deleteTopicInBroker(addrs, topic); + } + + @Override + public void deleteTopicInNameServer(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException, + MQClientException { + defaultMQAdminExtImpl.deleteTopicInNameServer(addrs, topic); + } + + @Override + public void deleteSubscriptionGroup(String addr, String groupName) throws RemotingException, MQBrokerException, InterruptedException, + MQClientException { + defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName); + } + + @Override + public void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException { + defaultMQAdminExtImpl.createAndUpdateKvConfig(namespace, key, value); + } + + @Override + public void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException, + MQClientException { + defaultMQAdminExtImpl.deleteKvConfig(namespace, key); + } + + public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, boolean force) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return defaultMQAdminExtImpl.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force); + } + + @Override + public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return resetOffsetByTimestamp(topic, group, timestamp, isForce, false); + } + + public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return defaultMQAdminExtImpl.resetOffsetByTimestamp(topic, group, timestamp, isForce, isC); + } + + @Override + public void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException { + this.defaultMQAdminExtImpl.resetOffsetNew(consumerGroup, topic, timestamp); + } + + @Override + public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException { + return defaultMQAdminExtImpl.getConsumeStatus(topic, group, clientAddr); + } + + @Override + public void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException { + defaultMQAdminExtImpl.createOrUpdateOrderConf(key, value, isCluster); + } + + @Override + public GroupList queryTopicConsumeByWho(String topic) throws InterruptedException, MQBrokerException, RemotingException, + MQClientException { + return this.defaultMQAdminExtImpl.queryTopicConsumeByWho(topic); + } + + @Override + public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException, + RemotingException, MQClientException { + return this.defaultMQAdminExtImpl.queryConsumeTimeSpan(topic, group); + } + + @Override + public boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException { + return defaultMQAdminExtImpl.cleanExpiredConsumerQueue(cluster); + } + + @Override + public boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException { + return defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr(addr); + } + + @Override + public boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException { + return defaultMQAdminExtImpl.cleanUnusedTopicByAddr(cluster); + } + + @Override + public boolean cleanUnusedTopicByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException { + return defaultMQAdminExtImpl.cleanUnusedTopicByAddr(addr); + } + + @Override + public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) throws RemotingException, + MQClientException, InterruptedException { + return defaultMQAdminExtImpl.getConsumerRunningInfo(consumerGroup, clientId, jstack); + } + + @Override + public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String msgId) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, msgId); + } + + @Override + public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId, final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); + } + + @Override + public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException, + MQBrokerException { + return this.defaultMQAdminExtImpl.messageTrackDetail(msg); + } + + @Override + public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException, + MQClientException, InterruptedException, MQBrokerException { + this.defaultMQAdminExtImpl.cloneGroupOffset(srcGroup, destGroup, topic, isOffline); + } + + @Override + public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey) throws RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { + return this.defaultMQAdminExtImpl.viewBrokerStatsData(brokerAddr, statsName, statsKey); + } + + @Override + public Set<String> getClusterList(String topic) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException { + return this.defaultMQAdminExtImpl.getClusterList(topic); + } + + @Override + public ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQClientException, InterruptedException { + return this.defaultMQAdminExtImpl.fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis); + } + + @Override + public Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException { + return this.defaultMQAdminExtImpl.getTopicClusterList(topic); + } + + @Override + public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException { + return this.defaultMQAdminExtImpl.getAllSubscriptionGroup(brokerAddr, timeoutMillis); + } + + @Override + public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException { + return this.defaultMQAdminExtImpl.getAllTopicGroup(brokerAddr, timeoutMillis); + } + + + /* (non-Javadoc) + * @see com.alibaba.rocketmq.client.MQAdmin#queryMessageByUniqKey(java.lang.String, java.lang.String) + */ + @Override + public MessageExt viewMessage(String topic, String msgId) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return this.defaultMQAdminExtImpl.viewMessage(topic, msgId); + } + + public String getAdminExtGroup() { + return adminExtGroup; + } + + public void setAdminExtGroup(String adminExtGroup) { + this.adminExtGroup = adminExtGroup; + } + + public String getCreateTopicKey() { + return createTopicKey; + } + + public void setCreateTopicKey(String createTopicKey) { + this.createTopicKey = createTopicKey; + } + + @Override + public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, long offset) throws RemotingException, InterruptedException, MQBrokerException { + this.defaultMQAdminExtImpl.updateConsumeOffset(brokerAddr, consumeGroup, mq, offset); + } + + @Override + public void updateNameServerConfig(final Properties properties, final List<String> nameServers) + throws InterruptedException, RemotingConnectException, + UnsupportedEncodingException, MQBrokerException, RemotingTimeoutException, + MQClientException, RemotingSendRequestException { + this.defaultMQAdminExtImpl.updateNameServerConfig(properties, nameServers); + } + + @Override + public Map<String, Properties> getNameServerConfig(final List<String> nameServers) + throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException, MQClientException, + UnsupportedEncodingException { + return this.defaultMQAdminExtImpl.getNameServerConfig(nameServers); + } +}
