Repository: incubator-rocketmq Updated Branches: refs/heads/develop bcc65e547 -> aa1c75774
[ROCKETMQ-254]Fix logger appender unit tests which cost too long Author: lindzh <linso...@163.com> Closes #141 from lindzh/logger_appender_test. Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/aa1c7577 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/aa1c7577 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/aa1c7577 Branch: refs/heads/develop Commit: aa1c75774eb33fca9a1c3f036249b6c5b86ef0f5 Parents: bcc65e5 Author: lindzh <linso...@163.com> Authored: Fri Aug 11 15:40:24 2017 +0800 Committer: lollipop <lolli...@apache.org> Committed: Fri Aug 11 15:40:24 2017 +0800 ---------------------------------------------------------------------- .../rocketmq/broker/BrokerControllerTest.java | 9 ++ .../filter/ConsumerFilterManagerTest.java | 20 +-- .../filter/MessageStoreWithFilterTest.java | 21 +-- .../org/apache/rocketmq/common/UtilAll.java | 15 ++ .../logappender/common/ProducerInstance.java | 30 ++-- .../log4j/RocketmqLog4jAppender.java | 4 +- .../log4j2/RocketmqLog4j2Appender.java | 4 +- .../logback/RocketmqLogbackAppender.java | 4 +- .../rocketmq/logappender/AbstractTestCase.java | 151 +++++-------------- .../apache/rocketmq/logappender/Log4jTest.java | 7 +- .../rocketmq/logappender/LogbackTest.java | 7 +- .../apache/rocketmq/logappender/log4j2Test.java | 7 +- .../src/test/resources/log4j-example.properties | 2 +- .../src/test/resources/log4j-example.xml | 10 +- .../src/test/resources/log4j2-example.xml | 2 +- .../src/test/resources/logback-example.xml | 12 +- .../rocketmq/store/AppendCallbackTest.java | 8 + .../rocketmq/store/ConsumeQueueExtTest.java | 35 ++--- .../apache/rocketmq/store/ConsumeQueueTest.java | 19 +-- .../rocketmq/store/DefaultMessageStoreTest.java | 10 ++ .../rocketmq/store/MappedFileQueueTest.java | 11 ++ .../apache/rocketmq/store/MappedFileTest.java | 10 ++ .../rocketmq/store/StoreCheckpointTest.java | 10 ++ .../rocketmq/store/index/IndexFileTest.java | 7 + .../rocketmq/test/base/IntegrationTestBase.java | 5 +- 25 files changed, 181 insertions(+), 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java index 86b9c4e..fe30d8f 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java @@ -18,11 +18,15 @@ package org.apache.rocketmq.broker; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; import org.junit.Test; +import java.io.File; + import static org.assertj.core.api.Assertions.assertThat; public class BrokerControllerTest { @@ -44,4 +48,9 @@ public class BrokerControllerTest { brokerController.shutdown(); } } + + @After + public void destory(){ + UtilAll.deleteFile(new File(new MessageStoreConfig().getStorePathRootDir())); + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java index c8412a8..68d6009 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.broker.filter; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; @@ -232,7 +233,7 @@ public class ConsumerFilterManagerTest { assertThat(filterData.isDead()).isTrue(); assertThat(filterData.getCompiledExpression()).isNotNull(); } finally { - deleteDirectory("./unit_test"); + UtilAll.deleteFile(new File("./unit_test")); } } @@ -269,23 +270,8 @@ public class ConsumerFilterManagerTest { assertThat(topicData).isNullOrEmpty(); } finally { - deleteDirectory("./unit_test"); + UtilAll.deleteFile(new File("./unit_test")); } } - protected void deleteDirectory(String rootPath) { - File file = new File(rootPath); - deleteFile(file); - } - - protected void deleteFile(File file) { - File[] subFiles = file.listFiles(); - if (subFiles != null) { - for (File sub : subFiles) { - deleteFile(sub); - } - } - - file.delete(); - } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java index 53e563e..461932c 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.filter; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -174,22 +175,6 @@ public class MessageStoreWithFilterTest { return msgs; } - protected void deleteDirectory(String rootPath) { - File file = new File(rootPath); - deleteFile(file); - } - - protected void deleteFile(File file) { - File[] subFiles = file.listFiles(); - if (subFiles != null) { - for (File sub : subFiles) { - deleteFile(sub); - } - } - - file.delete(); - } - protected List<MessageExtBrokerInner> filtered(List<MessageExtBrokerInner> msgs, ConsumerFilterData filterData) { List<MessageExtBrokerInner> filteredMsgs = new ArrayList<MessageExtBrokerInner>(); @@ -301,7 +286,7 @@ public class MessageStoreWithFilterTest { } finally { master.shutdown(); master.destroy(); - deleteDirectory(storePath); + UtilAll.deleteFile(new File(storePath)); } } @@ -386,7 +371,7 @@ public class MessageStoreWithFilterTest { } finally { master.shutdown(); master.destroy(); - deleteDirectory(storePath); + UtilAll.deleteFile(new File(storePath)); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/common/src/main/java/org/apache/rocketmq/common/UtilAll.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index 15d4108..5f5a339 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -505,4 +505,19 @@ public class UtilAll { throw new RuntimeException("Can not get local ip", e); } } + + public static void deleteFile(File file) { + if (!file.exists()) { + return; + } + if (file.isFile()) { + file.delete(); + } else if (file.isDirectory()) { + File[] files = file.listFiles(); + for (File file1 : files) { + deleteFile(file1); + } + file.delete(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java ---------------------------------------------------------------------- diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java index 669e30c..53521d4 100644 --- a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java +++ b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.logappender.common; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MQProducer; @@ -39,20 +40,26 @@ public class ProducerInstance { public static final String DEFAULT_GROUP = "rocketmq_appender"; - private static ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>(); + private ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>(); - private static String genKey(String nameServerAddress, String group) { + private static ProducerInstance instance = new ProducerInstance(); + + public static ProducerInstance getProducerInstance() { + return instance; + } + + private String genKey(String nameServerAddress, String group) { return nameServerAddress + "_" + group; } - public static MQProducer getInstance(String nameServerAddress, String group) throws MQClientException { - if (group == null) { + public MQProducer getInstance(String nameServerAddress, String group) throws MQClientException { + if (StringUtils.isBlank(group)) { group = DEFAULT_GROUP; } String genKey = genKey(nameServerAddress, group); - MQProducer p = producerMap.get(genKey); + MQProducer p = getProducerInstance().producerMap.get(genKey); if (p != null) { return p; } @@ -60,8 +67,7 @@ public class ProducerInstance { DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group); defaultMQProducer.setNamesrvAddr(nameServerAddress); MQProducer beforeProducer = null; - //cas put producer - beforeProducer = producerMap.putIfAbsent(genKey, defaultMQProducer); + beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer); if (beforeProducer != null) { return beforeProducer; } @@ -70,22 +76,22 @@ public class ProducerInstance { } - public static void removeAndClose(String nameServerAddress, String group) { + public void removeAndClose(String nameServerAddress, String group) { if (group == null) { group = DEFAULT_GROUP; } String genKey = genKey(nameServerAddress, group); - MQProducer producer = producerMap.remove(genKey); + MQProducer producer = getProducerInstance().producerMap.remove(genKey); if (producer != null) { producer.shutdown(); } } - public static void closeAll() { - Set<Map.Entry<String, MQProducer>> entries = producerMap.entrySet(); + public void closeAll() { + Set<Map.Entry<String, MQProducer>> entries = getProducerInstance().producerMap.entrySet(); for (Map.Entry<String, MQProducer> entry : entries) { - producerMap.remove(entry.getKey()); + getProducerInstance().producerMap.remove(entry.getKey()); entry.getValue().shutdown(); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java ---------------------------------------------------------------------- diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java index b2983b6..3fd8d4c 100644 --- a/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java +++ b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java @@ -66,7 +66,7 @@ public class RocketmqLog4jAppender extends AppenderSkeleton { return; } try { - producer = ProducerInstance.getInstance(nameServerAddress, producerGroup); + producer = ProducerInstance.getProducerInstance().getInstance(nameServerAddress, producerGroup); } catch (Exception e) { LogLog.error("activateOptions nameserver:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); } @@ -129,7 +129,7 @@ public class RocketmqLog4jAppender extends AppenderSkeleton { this.closed = true; try { - ProducerInstance.removeAndClose(this.nameServerAddress, this.producerGroup); + ProducerInstance.getProducerInstance().removeAndClose(this.nameServerAddress, this.producerGroup); } catch (Exception e) { LogLog.error("Closing RocketmqLog4jAppender [" + name + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java ---------------------------------------------------------------------- diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java index fb8341f..5a6362e 100644 --- a/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java +++ b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java @@ -81,7 +81,7 @@ public class RocketmqLog4j2Appender extends AbstractAppender { this.nameServerAddress = nameServerAddress; this.producerGroup = producerGroup; try { - this.producer = ProducerInstance.getInstance(this.nameServerAddress, this.producerGroup); + this.producer = ProducerInstance.getProducerInstance().getInstance(this.nameServerAddress, this.producerGroup); } catch (Exception e) { ErrorHandler handler = this.getHandler(); if (handler != null) { @@ -127,7 +127,7 @@ public class RocketmqLog4j2Appender extends AbstractAppender { public boolean stop(long timeout, TimeUnit timeUnit) { this.setStopping(); try { - ProducerInstance.removeAndClose(this.nameServerAddress, this.producerGroup); + ProducerInstance.getProducerInstance().removeAndClose(this.nameServerAddress, this.producerGroup); } catch (Exception e) { ErrorHandler handler = this.getHandler(); if (handler != null) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java ---------------------------------------------------------------------- diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java b/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java index cb45522..50ba564 100644 --- a/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java +++ b/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java @@ -97,7 +97,7 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> { return; } try { - producer = ProducerInstance.getInstance(nameServerAddress, producerGroup); + producer = ProducerInstance.getProducerInstance().getInstance(nameServerAddress, producerGroup); } catch (Exception e) { addError("Starting RocketmqLogbackAppender [" + this.getName() + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); @@ -119,7 +119,7 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> { this.started = false; try { - ProducerInstance.removeAndClose(this.nameServerAddress, this.producerGroup); + ProducerInstance.getProducerInstance().removeAndClose(this.nameServerAddress, this.producerGroup); } catch (Exception e) { addError("Closeing RocketmqLogbackAppender [" + this.getName() + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java ---------------------------------------------------------------------- diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java index d3e2f8a..9faebb9 100644 --- a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java @@ -16,139 +16,58 @@ */ package org.apache.rocketmq.logappender; -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.MQVersion; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.namesrv.NamesrvConfig; +import org.apache.rocketmq.client.producer.DefaultMQProducer; + +import org.apache.rocketmq.common.message.*; import org.apache.rocketmq.logappender.common.ProducerInstance; -import org.apache.rocketmq.namesrv.NamesrvController; -import org.apache.rocketmq.remoting.netty.NettyClientConfig; -import org.apache.rocketmq.remoting.netty.NettyServerConfig; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.Before; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.mockito.Mockito.*; -import java.util.List; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.lang.reflect.Field; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; + /** * Basic test rocketmq broker and name server init */ public class AbstractTestCase { - private static String nameServer = "localhost:9876"; - - private static NamesrvController namesrvController; - - private static BrokerController brokerController; - - private static String topic = "TopicTest"; - - @BeforeClass - public static void startRocketmqService() throws Exception { - - startNamesrv(); - - startBroker(); - } - - /** - * Start rocketmq name server - * @throws Exception - */ - private static void startNamesrv() throws Exception { - - NamesrvConfig namesrvConfig = new NamesrvConfig(); - NettyServerConfig nettyServerConfig = new NettyServerConfig(); - nettyServerConfig.setListenPort(9876); - - namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig); - boolean initResult = namesrvController.initialize(); - if (!initResult) { - namesrvController.shutdown(); - throw new Exception(); - } - namesrvController.start(); - } + private static CopyOnWriteArrayList<Message> messages = new CopyOnWriteArrayList<>(); - /** - * Start rocketmq broker service - * @throws Exception - */ - private static void startBroker() throws Exception { - - System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); - - BrokerConfig brokerConfig = new BrokerConfig(); - brokerConfig.setNamesrvAddr(nameServer); - brokerConfig.setBrokerId(MixAll.MASTER_ID); - NettyServerConfig nettyServerConfig = new NettyServerConfig(); - nettyServerConfig.setListenPort(10911); - NettyClientConfig nettyClientConfig = new NettyClientConfig(); - MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - - brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); - boolean initResult = brokerController.initialize(); - if (!initResult) { - brokerController.shutdown(); - throw new Exception(); - } - brokerController.start(); + @Before + public void mockLoggerAppender() throws Exception { + DefaultMQProducer defaultMQProducer = spy(new DefaultMQProducer("loggerAppender")); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + Message message = (Message) invocationOnMock.getArgument(0); + messages.add(message); + return null; + } + }).when(defaultMQProducer).sendOneway(any(Message.class)); + ProducerInstance spy = mock(ProducerInstance.class); + Field instance = ProducerInstance.class.getDeclaredField("instance"); + instance.setAccessible(true); + instance.set(ProducerInstance.class, spy); + doReturn(defaultMQProducer).when(spy).getInstance(anyString(), anyString()); } - @AfterClass - public static void stop() { - ProducerInstance.closeAll(); - if (brokerController != null) { - brokerController.shutdown(); - } + public void clear() { - if (namesrvController != null) { - namesrvController.shutdown(); - } } - protected int consumeMessages(int count,final String key,int timeout) throws MQClientException, InterruptedException { - + protected int consumeMessages(int count, final String key, int timeout) { final AtomicInteger cc = new AtomicInteger(0); - final CountDownLatch countDownLatch = new CountDownLatch(count); - - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello"); - consumer.setNamesrvAddr(nameServer); - consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); - consumer.subscribe(topic, "*"); - - consumer.registerMessageListener(new MessageListenerConcurrently() { - - @Override - public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { - for (MessageExt msg : msgs) { - String body = new String(msg.getBody()); - if(key==null||body.contains(key)){ - countDownLatch.countDown(); - cc.incrementAndGet(); - continue; - } - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + for (Message message : messages) { + String body = new String(message.getBody()); + if (body.contains(key)) { + cc.incrementAndGet(); } - }); - consumer.start(); - countDownLatch.await(timeout, TimeUnit.SECONDS); - consumer.shutdown(); + } return cc.get(); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java ---------------------------------------------------------------------- diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java b/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java index 75f9bf2..6306ec5 100644 --- a/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java @@ -32,12 +32,13 @@ public abstract class Log4jTest extends AbstractTestCase{ @Test public void testLog4j() throws InterruptedException, MQClientException { + clear(); Logger logger = Logger.getLogger("testLogger"); - for (int i = 0; i < 50; i++) { + for (int i = 0; i < 10; i++) { logger.info("log4j " + this.getType() + " simple test message " + i); } - int received = consumeMessages(30, "log4j",30); - Assert.assertTrue(received>20); + int received = consumeMessages(10, "log4j",10); + Assert.assertTrue(received>5); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java ---------------------------------------------------------------------- diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java b/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java index 15a21a3..4f9d3e5 100644 --- a/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java @@ -44,11 +44,12 @@ public class LogbackTest extends AbstractTestCase{ @Test public void testLogback() throws InterruptedException, MQClientException { + clear(); Logger logger = LoggerFactory.getLogger("testLogger"); - for (int i = 0; i < 50; i++) { + for (int i = 0; i < 10; i++) { logger.info("logback test message " + i); } - int received = consumeMessages(30, "logback",30); - Assert.assertTrue(received>20); + int received = consumeMessages(10, "logback",10); + Assert.assertTrue(received>=5); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java ---------------------------------------------------------------------- diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java b/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java index 75ba523..4089644 100644 --- a/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java @@ -34,11 +34,12 @@ public class log4j2Test extends AbstractTestCase{ @Test public void testLog4j2() throws InterruptedException, MQClientException { + clear(); Logger logger = LogManager.getLogger("test"); - for (int i = 0; i < 50; i++) { + for (int i = 0; i < 10; i++) { logger.info("log4j2 log message " + i); } - int received = consumeMessages(30, "log4j2",30); - Assert.assertTrue(received>20); + int received = consumeMessages(10, "log4j2",10); + Assert.assertTrue(received>5); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/logappender/src/test/resources/log4j-example.properties ---------------------------------------------------------------------- diff --git a/logappender/src/test/resources/log4j-example.properties b/logappender/src/test/resources/log4j-example.properties index b4e8114..7fdebbb 100644 --- a/logappender/src/test/resources/log4j-example.properties +++ b/logappender/src/test/resources/log4j-example.properties @@ -32,7 +32,7 @@ log4j.appender.store.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender log4j.appender.mq.Tag=log log4j.appender.mq.Topic=TopicTest -log4j.appender.mq.ProducerGroup=log4jp +log4j.appender.mq.ProducerGroup=loggerAppender log4j.appender.mq.NameServerAddress=127.0.0.1:9876 log4j.appender.mq.layout=org.apache.log4j.PatternLayout log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/logappender/src/test/resources/log4j-example.xml ---------------------------------------------------------------------- diff --git a/logappender/src/test/resources/log4j-example.xml b/logappender/src/test/resources/log4j-example.xml index e58bcb0..b0dc776 100644 --- a/logappender/src/test/resources/log4j-example.xml +++ b/logappender/src/test/resources/log4j-example.xml @@ -29,22 +29,16 @@ limitations under the License. <appender name="mqAppender1" class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender"> <param name="Tag" value="log1" /> <param name="Topic" value="TopicTest" /> - <param name="ProducerGroup" value="log4jxml" /> + <param name="ProducerGroup" value="loggerAppender" /> <param name="NameServerAddress" value="127.0.0.1:9876"/> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" /> </layout> </appender> - <appender name="mqAsyncAppender1" class="org.apache.log4j.AsyncAppender"> - <param name="BufferSize" value="1024" /> - <param name="Blocking" value="false" /> - <appender-ref ref="mqAppender1"/> - </appender> - <logger name="testLogger" additivity="false"> <level value="INFO" /> - <appender-ref ref="mqAsyncAppender1" /> + <appender-ref ref="mqAppender1" /> <appender-ref ref="consoleAppender" /> </logger> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/logappender/src/test/resources/log4j2-example.xml ---------------------------------------------------------------------- diff --git a/logappender/src/test/resources/log4j2-example.xml b/logappender/src/test/resources/log4j2-example.xml index 358d40e..3ee8a01 100644 --- a/logappender/src/test/resources/log4j2-example.xml +++ b/logappender/src/test/resources/log4j2-example.xml @@ -18,7 +18,7 @@ <Configuration status="warn" name="Rocketmq"> <Appenders> - <RocketMQ name="rocketmqAppender" producerGroup="log4j2" nameServerAddress="127.0.0.1:9876" + <RocketMQ name="rocketmqAppender" producerGroup="loggerAppender" nameServerAddress="127.0.0.1:9876" topic="TopicTest" tag="log"> <PatternLayout pattern="%d [%p] hahahah %c %m%n"/> </RocketMQ> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/logappender/src/test/resources/logback-example.xml ---------------------------------------------------------------------- diff --git a/logappender/src/test/resources/logback-example.xml b/logappender/src/test/resources/logback-example.xml index 21b5434..3786137 100644 --- a/logappender/src/test/resources/logback-example.xml +++ b/logappender/src/test/resources/logback-example.xml @@ -58,21 +58,13 @@ <appender name="mqAppender1" class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender"> <tag>log1</tag> <topic>TopicTest</topic> - <producerGroup>logback</producerGroup> + <producerGroup>loggerAppender</producerGroup> <nameServerAddress>127.0.0.1:9876</nameServerAddress> <layout> <pattern>%date %p %t - %m%n</pattern> </layout> </appender> - <appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender"> - <queueSize>1024</queueSize> - <discardingThreshold>80</discardingThreshold> - <maxFlushTime>2000</maxFlushTime> - <neverBlock>true</neverBlock> - <appender-ref ref="mqAppender1"/> - </appender> - <root> <level value="debug"/> <appender-ref ref="consoleAppender"/> @@ -83,7 +75,7 @@ </logger> <logger name="testLogger" level="debug" additivity="false"> - <appender-ref ref="mqAsyncAppender1"/> + <appender-ref ref="mqAppender1"/> <appender-ref ref="consoleAppender"/> </logger> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java index fc667b6..eaa18d5 100644 --- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java @@ -25,11 +25,14 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; + +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -58,6 +61,11 @@ public class AppendCallbackTest { callback = commitLog.new DefaultAppendMessageCallback(1024); } + @After + public void destroy(){ + UtilAll.deleteFile(new File(System.getProperty("user.home") + File.separator + "unitteststore")); + } + @Test public void testAppendMessageBatchEndOfFile() throws Exception{ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java index 5dbc584..6c2f5ad 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.store; +import org.apache.rocketmq.common.UtilAll; +import org.junit.After; import org.junit.Test; import java.io.File; @@ -62,22 +64,6 @@ public class ConsumeQueueExtTest { return cqExtUnit; } - protected void deleteDirectory(String rootPath) { - File file = new File(rootPath); - deleteFile(file); - } - - protected void deleteFile(File file) { - File[] subFiles = file.listFiles(); - if (subFiles != null) { - for (File sub : subFiles) { - deleteFile(sub); - } - } - - file.delete(); - } - protected void putSth(ConsumeQueueExt consumeQueueExt, boolean getAfterPut, boolean unitSameSize, int unitCount) { for (int i = 0; i < unitCount; i++) { @@ -111,7 +97,7 @@ public class ConsumeQueueExtTest { putSth(consumeQueueExt, true, false, unitCount); } finally { consumeQueueExt.destroy(); - deleteDirectory(storePath); + UtilAll.deleteFile(new File(storePath)); } } @@ -139,7 +125,7 @@ public class ConsumeQueueExtTest { } } finally { consumeQueueExt.destroy(); - deleteDirectory(storePath); + UtilAll.deleteFile(new File(storePath)); } } @@ -161,7 +147,7 @@ public class ConsumeQueueExtTest { assertThat(unit).isNull(); } finally { consumeQueueExt.destroy(); - deleteDirectory(storePath); + UtilAll.deleteFile(new File(storePath)); } } @@ -199,7 +185,7 @@ public class ConsumeQueueExtTest { } finally { putCqExt.destroy(); loadCqExt.destroy(); - deleteDirectory(storePath); + UtilAll.deleteFile(new File(storePath)); } } @@ -222,7 +208,7 @@ public class ConsumeQueueExtTest { assertThat(expectMinAddress).isEqualTo(minAddress); } finally { consumeQueueExt.destroy(); - deleteDirectory(storePath); + UtilAll.deleteFile(new File(storePath)); } } @@ -245,7 +231,12 @@ public class ConsumeQueueExtTest { assertThat(expectMaxAddress).isEqualTo(maxAddress); } finally { consumeQueueExt.destroy(); - deleteDirectory(storePath); + UtilAll.deleteFile(new File(storePath)); } } + + @After + public void destroy(){ + UtilAll.deleteFile(new File(storePath)); + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java index 9c42fb9..d07a768 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.store; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -131,22 +132,6 @@ public class ConsumeQueueTest { } } - protected void deleteDirectory(String rootPath) { - File file = new File(rootPath); - deleteFile(file); - } - - protected void deleteFile(File file) { - File[] subFiles = file.listFiles(); - if (subFiles != null) { - for (File sub : subFiles) { - deleteFile(sub); - } - } - - file.delete(); - } - @Test public void testConsumeQueueWithExtendData() { DefaultMessageStore master = null; @@ -220,7 +205,7 @@ public class ConsumeQueueTest { } finally { master.shutdown(); master.destroy(); - deleteDirectory(storePath); + UtilAll.deleteFile(new File(storePath)); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 273cc21..ac78a1d 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.store; +import java.io.File; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -24,8 +25,10 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.Before; import org.junit.Test; @@ -47,6 +50,13 @@ public class DefaultMessageStoreTest { BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); } + @After + public void destory() { + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + File file = new File(messageStoreConfig.getStorePathRootDir()); + UtilAll.deleteFile(file); + } + public MessageStore buildMessageStore() throws Exception { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java index f1f9c1f..203dfcd 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java @@ -17,8 +17,13 @@ package org.apache.rocketmq.store; +import java.io.File; import java.nio.ByteBuffer; import java.util.Arrays; + +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -176,4 +181,10 @@ public class MappedFileQueueTest { mappedFileQueue.shutdown(1000); mappedFileQueue.destroy(); } + + @After + public void destory() { + File file = new File("target/unit_test_store"); + UtilAll.deleteFile(file); + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java index d2736ac..50d0ae4 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileTest.java @@ -20,7 +20,11 @@ */ package org.apache.rocketmq.store; +import java.io.File; import java.io.IOException; + +import org.apache.rocketmq.common.UtilAll; +import org.junit.After; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -47,4 +51,10 @@ public class MappedFileTest { assertThat(mappedFile.isCleanupOver()).isTrue(); assertThat(mappedFile.destroy(1000)).isTrue(); } + + @After + public void destory() { + File file = new File("target/unit_test_store"); + UtilAll.deleteFile(file); + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java b/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java index 1af63d4..3c0d925 100644 --- a/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/StoreCheckpointTest.java @@ -20,7 +20,11 @@ */ package org.apache.rocketmq.store; +import java.io.File; import java.io.IOException; + +import org.apache.rocketmq.common.UtilAll; +import org.junit.After; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -42,4 +46,10 @@ public class StoreCheckpointTest { assertThat(storeCheckpoint.getPhysicMsgTimestamp()).isEqualTo(physicMsgTimestamp); assertThat(storeCheckpoint.getLogicsMsgTimestamp()).isEqualTo(logicsMsgTimestamp); } + + @After + public void destory() { + File file = new File("target/checkpoint_test"); + UtilAll.deleteFile(file); + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java index f172e65..7ad5b38 100644 --- a/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/index/IndexFileTest.java @@ -20,8 +20,11 @@ */ package org.apache.rocketmq.store.index; +import java.io.File; import java.util.ArrayList; import java.util.List; + +import org.apache.rocketmq.common.UtilAll; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -42,6 +45,8 @@ public class IndexFileTest { boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis()); assertThat(putResult).isFalse(); indexFile.destroy(0); + File file = new File("100"); + UtilAll.deleteFile(file); } @Test @@ -62,5 +67,7 @@ public class IndexFileTest { assertThat(phyOffsets).isNotEmpty(); assertThat(phyOffsets.size()).isEqualTo(1); indexFile.destroy(0); + File file = new File("200"); + UtilAll.deleteFile(file); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aa1c7577/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index 61e98e2..64911fb 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -25,6 +25,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.namesrv.NamesrvConfig; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.netty.NettyClientConfig; @@ -75,7 +76,7 @@ public class IntegrationTestBase { } } for (File file : TMPE_FILES) { - deleteFile(file); + UtilAll.deleteFile(file); } } catch (Exception e){ logger.error("Shutdown error", e); @@ -187,5 +188,5 @@ public class IntegrationTestBase { file.delete(); } } - + }