This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4cc331159 [ISSUE #6123] Fix flaky test in tiered storage (#6124)
4cc331159 is described below
commit 4cc33115903a8eeaba30d635e1f2dc356770e5f7
Author: SSpirits <[email protected]>
AuthorDate: Tue Feb 21 10:09:05 2023 +0800
[ISSUE #6123] Fix flaky test in tiered storage (#6124)
* fix flaky test in tiered storage
* add debug logs
* destroy metadata store and container manager after TieredMessageStoreTest
* remove debug logs
---
.../container/TieredContainerManager.java | 11 +++-
.../tieredstore/container/TieredFileQueue.java | 7 ++-
.../provider/posix/PosixFileSegment.java | 8 +--
.../rocketmq/tieredstore/util/TieredStoreUtil.java | 4 ++
.../rocketmq/tieredstore/TieredDispatcherTest.java | 20 +++----
.../tieredstore/TieredMessageFetcherTest.java | 23 ++++----
.../tieredstore/TieredMessageStoreTest.java | 10 ++--
.../rocketmq/tieredstore/TieredStoreTestUtil.java | 62 ++++++++++++++++++++++
.../container/TieredContainerManagerTest.java | 21 +++++---
.../tieredstore/container/TieredFileQueueTest.java | 15 +++---
.../tieredstore/container/TieredIndexFileTest.java | 22 ++++----
.../container/TieredMessageQueueContainerTest.java | 22 ++++----
.../tieredstore/metadata/MetadataStoreTest.java | 20 +++----
.../metrics/TieredStoreMetricsManagerTest.java | 10 ++++
.../tieredstore/mock/MemoryFileSegment.java | 4 +-
...ent.java => MemoryFileSegmentWithoutCheck.java} | 62 ++--------------------
.../provider/posix/PosixFileSegmentTest.java | 13 +++--
17 files changed, 190 insertions(+), 144 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredContainerManager.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredContainerManager.java
index ca2f4f81f..94f1e048d 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredContainerManager.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredContainerManager.java
@@ -44,12 +44,17 @@ public class TieredContainerManager {
private final TieredMessageStoreConfig storeConfig;
public static TieredContainerManager getInstance(TieredMessageStoreConfig
storeConfig) {
+ if (storeConfig == null) {
+ return instance;
+ }
+
if (instance == null) {
synchronized (TieredContainerManager.class) {
if (instance == null) {
try {
instance = new TieredContainerManager(storeConfig);
- } catch (Exception ignored) {
+ } catch (Exception e) {
+ logger.error("TieredContainerManager#getInstance:
create container manager failed", e);
}
}
}
@@ -58,6 +63,10 @@ public class TieredContainerManager {
}
public static TieredIndexFile getIndexFile(TieredMessageStoreConfig
storeConfig) {
+ if (storeConfig == null) {
+ return indexFile;
+ }
+
if (indexFile == null) {
synchronized (TieredContainerManager.class) {
if (indexFile == null) {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredFileQueue.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredFileQueue.java
index 8ad1b1491..1640e8daf 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredFileQueue.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredFileQueue.java
@@ -51,7 +51,7 @@ public class TieredFileQueue {
private final TieredMessageStoreConfig storeConfig;
private final TieredMetadataStore metadataStore;
- private final List<TieredFileSegment> fileSegmentList = new ArrayList<>();
+ protected final List<TieredFileSegment> fileSegmentList = new
ArrayList<>();
protected final List<TieredFileSegment> needCommitFileSegmentList = new
CopyOnWriteArrayList<>();
private final ReentrantReadWriteLock fileSegmentLock = new
ReentrantReadWriteLock();
@@ -130,7 +130,10 @@ public class TieredFileQueue {
}
}
- private void loadFromMetadata() {
+ protected void loadFromMetadata() {
+ fileSegmentList.clear();
+ needCommitFileSegmentList.clear();
+
metadataStore.iterateFileSegment(fileType, messageQueue.getTopic(),
messageQueue.getQueueId(), metadata -> {
if (metadata.getStatus() == FileSegmentMetadata.STATUS_DELETED) {
return;
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
index 9def6bd29..b83967db2 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -129,10 +129,6 @@ public class PosixFileSegment extends TieredFileSegment {
@Override
public void destroyFile() {
- if (file.exists()) {
- file.delete();
- }
-
try {
if (readFileChannel != null && readFileChannel.isOpen()) {
readFileChannel.close();
@@ -143,6 +139,10 @@ public class PosixFileSegment extends TieredFileSegment {
} catch (IOException e) {
logger.error("PosixFileSegment#destroyFile: destroy file {}
failed: ", filepath, e);
}
+
+ if (file.exists()) {
+ file.delete();
+ }
}
@Override
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/TieredStoreUtil.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/TieredStoreUtil.java
index c41e5a48e..d1ba8f761 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/TieredStoreUtil.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/TieredStoreUtil.java
@@ -135,6 +135,10 @@ public class TieredStoreUtil {
}
public static TieredMetadataStore
getMetadataStore(TieredMessageStoreConfig storeConfig) {
+ if (storeConfig == null) {
+ return metadataStoreInstance;
+ }
+
if (metadataStoreInstance == null) {
synchronized (TieredMetadataStore.class) {
if (metadataStoreInstance == null) {
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
index 33e908824..a89f736e8 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
@@ -44,16 +44,17 @@ import org.junit.Test;
import org.mockito.Mockito;
public class TieredDispatcherTest {
- TieredMessageStoreConfig storeConfig;
- MessageQueue mq;
- TieredMetadataStore metadataStore;
+ private TieredMessageStoreConfig storeConfig;
+ private MessageQueue mq;
+ private TieredMetadataStore metadataStore;
+
+ private final String storePath = FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID();
@Before
public void setUp() {
- MemoryFileSegment.checkSize = false;
storeConfig = new TieredMessageStoreConfig();
- storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID());
-
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
+ storeConfig.setStorePathRootDir(storePath);
+
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegmentWithoutCheck");
storeConfig.setBrokerName(storeConfig.getBrokerName());
mq = new MessageQueue("TieredMessageQueueContainerTest",
storeConfig.getBrokerName(), 0);
metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
@@ -61,10 +62,9 @@ public class TieredDispatcherTest {
@After
public void tearDown() throws IOException {
- MemoryFileSegment.checkSize = true;
- FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
- TieredStoreUtil.getMetadataStore(storeConfig).destroy();
- TieredContainerManager.getInstance(storeConfig).cleanup();
+ TieredStoreTestUtil.destroyContainerManager();
+ TieredStoreTestUtil.destroyMetadataStore();
+ TieredStoreTestUtil.destroyTempDir(storePath);
}
@Test
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index 9dd94ccf6..2d2c5d5f2 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -38,8 +38,6 @@ import
org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.container.TieredContainerManager;
import org.apache.rocketmq.tieredstore.container.TieredIndexFile;
import org.apache.rocketmq.tieredstore.container.TieredMessageQueueContainer;
-import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
-import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -51,30 +49,29 @@ import org.junit.Before;
import org.junit.Test;
public class TieredMessageFetcherTest {
- TieredMessageStoreConfig storeConfig;
- MessageQueue mq;
- TieredMetadataStore metadataStore;
+ private TieredMessageStoreConfig storeConfig;
+ private MessageQueue mq;
+
+ private final String storePath = FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID();
@Before
public void setUp() {
- MemoryFileSegment.checkSize = false;
storeConfig = new TieredMessageStoreConfig();
- storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID());
+ storeConfig.setStorePathRootDir(storePath);
storeConfig.setBrokerName(storeConfig.getBrokerName());
storeConfig.setReadAheadCacheExpireDuration(Long.MAX_VALUE);
-
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
+
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegmentWithoutCheck");
storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2);
storeConfig.setTieredStoreIndexFileMaxIndexNum(3);
- metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
mq = new MessageQueue("TieredMessageFetcherTest",
storeConfig.getBrokerName(), 0);
+ TieredStoreUtil.getMetadataStore(storeConfig);
}
@After
public void tearDown() throws IOException {
- MemoryFileSegment.checkSize = true;
- FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
- TieredStoreUtil.getMetadataStore(storeConfig).destroy();
- TieredContainerManager.getInstance(storeConfig).cleanup();
+ TieredStoreTestUtil.destroyContainerManager();
+ TieredStoreTestUtil.destroyMetadataStore();
+ TieredStoreTestUtil.destroyTempDir(storePath);
}
public Triple<TieredMessageFetcher, ByteBuffer, ByteBuffer> buildFetcher()
{
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index 800b10938..c16ba141c 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -67,10 +67,12 @@ public class TieredMessageStoreTest {
private Configuration configuration;
private TieredContainerManager containerManager;
+ private final String storePath = FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID();
+
@Before
public void setUp() {
storeConfig = new MessageStoreConfig();
- storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID());
+ storeConfig.setStorePathRootDir(storePath);
mq = new MessageQueue("TieredMessageStoreTest", "broker", 0);
nextStore = Mockito.mock(DefaultMessageStore.class);
@@ -102,9 +104,9 @@ public class TieredMessageStoreTest {
@After
public void tearDown() throws IOException {
- FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
- TieredStoreUtil.getMetadataStore(store.getStoreConfig()).destroy();
- TieredContainerManager.getInstance(store.getStoreConfig()).cleanup();
+ TieredStoreTestUtil.destroyContainerManager();
+ TieredStoreTestUtil.destroyMetadataStore();
+ TieredStoreTestUtil.destroyTempDir(storePath);
}
private void mockContainer() {
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredStoreTestUtil.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredStoreTestUtil.java
new file mode 100644
index 000000000..c537a83c9
--- /dev/null
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredStoreTestUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import org.apache.commons.io.FileUtils;
+import org.apache.rocketmq.tieredstore.container.TieredContainerManager;
+import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.junit.Assert;
+
+public class TieredStoreTestUtil {
+ public static void destroyMetadataStore() {
+ TieredMetadataStore metadataStore =
TieredStoreUtil.getMetadataStore(null);
+ if (metadataStore != null) {
+ metadataStore.destroy();
+ }
+ try {
+ Field field =
TieredStoreUtil.class.getDeclaredField("metadataStoreInstance");
+ field.setAccessible(true);
+ field.set(null, null);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ Assert.fail(e.getClass().getCanonicalName() + ": " +
e.getMessage());
+ }
+ }
+
+ public static void destroyContainerManager() {
+ TieredContainerManager containerManager =
TieredContainerManager.getInstance(null);
+ if (containerManager != null) {
+ containerManager.destroy();
+ }
+ try {
+ Field field =
TieredContainerManager.class.getDeclaredField("instance");
+ field.setAccessible(true);
+ field.set(null, null);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ Assert.fail(e.getClass().getCanonicalName() + ": " +
e.getMessage());
+ }
+ }
+
+ public static void destroyTempDir(String storePath) {
+ try {
+ FileUtils.deleteDirectory(new File(storePath));
+ } catch (Exception ignore) {
+ }
+ }
+}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredContainerManagerTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredContainerManagerTest.java
index 1c8254d98..2f8ad3615 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredContainerManagerTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredContainerManagerTest.java
@@ -22,6 +22,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -32,14 +33,16 @@ import org.junit.Before;
import org.junit.Test;
public class TieredContainerManagerTest {
- TieredMessageStoreConfig storeConfig;
- MessageQueue mq;
- TieredMetadataStore metadataStore;
+ private TieredMessageStoreConfig storeConfig;
+ private MessageQueue mq;
+ private TieredMetadataStore metadataStore;
+
+ private final String storePath = FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID();
@Before
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
- storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID());
+ storeConfig.setStorePathRootDir(storePath);
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
storeConfig.setBrokerName(storeConfig.getBrokerName());
mq = new MessageQueue("TieredContainerManagerTest",
storeConfig.getBrokerName(), 0);
@@ -48,9 +51,9 @@ public class TieredContainerManagerTest {
@After
public void tearDown() throws IOException {
- FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
- TieredStoreUtil.getMetadataStore(storeConfig).destroy();
- TieredContainerManager.getInstance(storeConfig).cleanup();
+ TieredStoreTestUtil.destroyContainerManager();
+ TieredStoreTestUtil.destroyMetadataStore();
+ TieredStoreTestUtil.destroyTempDir(storePath);
}
@@ -64,7 +67,9 @@ public class TieredContainerManagerTest {
boolean load = containerManager.load();
Assert.assertTrue(load);
- Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() ->
containerManager.getAllMQContainer().size() == 2);
+ Awaitility.await()
+ .atMost(3, TimeUnit.SECONDS)
+ .until(() -> containerManager.getAllMQContainer().size() == 2);
TieredMessageQueueContainer container =
containerManager.getMQContainer(mq);
Assert.assertNotNull(container);
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredFileQueueTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredFileQueueTest.java
index 6385fa281..60f751a62 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredFileQueueTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredFileQueueTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment;
@@ -33,21 +34,23 @@ import org.junit.Before;
import org.junit.Test;
public class TieredFileQueueTest {
- TieredMessageStoreConfig storeConfig;
- MessageQueue queue;
+ private TieredMessageStoreConfig storeConfig;
+ private MessageQueue queue;
+
+ private final String storePath = FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID();
@Before
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
- storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID());
+ storeConfig.setStorePathRootDir(storePath);
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
queue = new MessageQueue("TieredFileQueueTest",
storeConfig.getBrokerName(), 0);
}
@After
public void tearDown() throws IOException {
- FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
- TieredStoreUtil.getMetadataStore(storeConfig).destroy();
+ TieredStoreTestUtil.destroyMetadataStore();
+ TieredStoreTestUtil.destroyTempDir(storePath);
}
@Test
@@ -149,7 +152,7 @@ public class TieredFileQueueTest {
TieredFileSegment fileSegment1 = new
MemoryFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
queue, 100, storeConfig);
fileSegment1.initPosition(fileSegment1.getSize() - 100);
- fileSegment1.setFull(false);
+ fileSegment1.setFull();
metadataStore.updateFileSegment(fileSegment1);
metadataStore.updateFileSegment(fileSegment1);
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java
index 0824cf35d..6a114e7ca 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java
@@ -26,9 +26,8 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
-import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
-import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.awaitility.Awaitility;
import org.junit.After;
@@ -39,27 +38,26 @@ import org.junit.Ignore;
import org.junit.Test;
public class TieredIndexFileTest {
- MessageQueue mq;
- TieredMessageStoreConfig storeConfig;
- TieredMetadataStore metadataStore;
+ private MessageQueue mq;
+ private TieredMessageStoreConfig storeConfig;
+
+ private final String storePath = FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID();
@Before
public void setUp() {
- MemoryFileSegment.checkSize = false;
storeConfig = new TieredMessageStoreConfig();
- storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID());
-
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
+ storeConfig.setStorePathRootDir(storePath);
+
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegmentWithoutCheck");
storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2);
storeConfig.setTieredStoreIndexFileMaxIndexNum(3);
mq = new MessageQueue("TieredIndexFileTest",
storeConfig.getBrokerName(), 1);
- metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
+ TieredStoreUtil.getMetadataStore(storeConfig);
}
@After
public void tearDown() throws IOException {
- MemoryFileSegment.checkSize = true;
- FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
-// metadataStore.reLoadStore();
+ TieredStoreTestUtil.destroyMetadataStore();
+ TieredStoreTestUtil.destroyTempDir(storePath);
}
@Ignore
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
index a9eb444c9..11afa362b 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
@@ -40,14 +41,16 @@ import org.junit.Before;
import org.junit.Test;
public class TieredMessageQueueContainerTest {
- TieredMessageStoreConfig storeConfig;
- MessageQueue mq;
- TieredMetadataStore metadataStore;
+ private TieredMessageStoreConfig storeConfig;
+ private MessageQueue mq;
+ private TieredMetadataStore metadataStore;
+
+ private final String storePath = FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID();
@Before
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
- storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID());
+ storeConfig.setStorePathRootDir(storePath);
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
storeConfig.setCommitLogRollingInterval(0);
storeConfig.setCommitLogRollingMinimumSize(999);
@@ -57,14 +60,13 @@ public class TieredMessageQueueContainerTest {
@After
public void tearDown() throws IOException {
- MemoryFileSegment.checkSize = true;
- FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
- TieredStoreUtil.getMetadataStore(storeConfig).destroy();
- TieredContainerManager.getInstance(storeConfig).cleanup();
+ TieredStoreTestUtil.destroyContainerManager();
+ TieredStoreTestUtil.destroyMetadataStore();
+ TieredStoreTestUtil.destroyTempDir(storePath);
}
@Test
- public void testAppendCommitLog() throws ClassNotFoundException,
NoSuchMethodException, IOException {
+ public void testAppendCommitLog() throws ClassNotFoundException,
NoSuchMethodException {
TieredMessageQueueContainer container = new
TieredMessageQueueContainer(mq, storeConfig);
ByteBuffer message = MessageBufferUtilTest.buildMessageBuffer();
AppendResult result = container.appendCommitLog(message);
@@ -136,7 +138,7 @@ public class TieredMessageQueueContainerTest {
@Test
public void testBinarySearchInQueueByTime() throws ClassNotFoundException,
NoSuchMethodException {
- MemoryFileSegment.checkSize = false;
+
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegmentWithoutCheck");
TieredMessageQueueContainer container = new
TieredMessageQueueContainer(mq, storeConfig);
container.initOffset(50);
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java
index 4832d1246..96539d1c4 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java
@@ -26,27 +26,29 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.container.TieredCommitLog;
import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
-import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class MetadataStoreTest {
- MessageQueue mq0;
- MessageQueue mq1;
- MessageQueue mq2;
- TieredMessageStoreConfig storeConfig;
- TieredMetadataStore metadataStore;
+ private MessageQueue mq0;
+ private MessageQueue mq1;
+ private MessageQueue mq2;
+ private TieredMessageStoreConfig storeConfig;
+ private TieredMetadataStore metadataStore;
+
+ private final String storePath = FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID();
@Before
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
- storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID());
+ storeConfig.setStorePathRootDir(storePath);
mq0 = new MessageQueue("MetadataStoreTest0",
storeConfig.getBrokerName(), 0);
mq1 = new MessageQueue("MetadataStoreTest1",
storeConfig.getBrokerName(), 0);
mq2 = new MessageQueue("MetadataStoreTest1",
storeConfig.getBrokerName(), 1);
@@ -55,8 +57,8 @@ public class MetadataStoreTest {
@After
public void tearDown() throws IOException {
- FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
- TieredStoreUtil.getMetadataStore(storeConfig).destroy();
+ TieredStoreTestUtil.destroyMetadataStore();
+ TieredStoreTestUtil.destroyTempDir(storePath);
}
@Test
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
index dea8f503f..170728d4b 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
@@ -17,12 +17,22 @@
package org.apache.rocketmq.tieredstore.metrics;
import io.opentelemetry.sdk.OpenTelemetrySdk;
+import java.io.IOException;
import org.apache.rocketmq.tieredstore.TieredMessageFetcher;
+import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.junit.After;
import org.junit.Test;
public class TieredStoreMetricsManagerTest {
+ @After
+ public void tearDown() throws IOException {
+ TieredStoreTestUtil.destroyContainerManager();
+ TieredStoreTestUtil.destroyMetadataStore();
+ }
+
+
@Test
public void getMetricsView() {
TieredStoreMetricsManager.getMetricsView();
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
index 25f4a6b6c..254b151e6 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
@@ -26,11 +26,11 @@ import
org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
import org.junit.Assert;
public class MemoryFileSegment extends TieredFileSegment {
- private final ByteBuffer memStore;
+ protected final ByteBuffer memStore;
public CompletableFuture<Boolean> blocker;
- public static boolean checkSize = true;
+ protected boolean checkSize = true;
public MemoryFileSegment(TieredFileSegment.FileSegmentType fileType,
MessageQueue messageQueue, long baseOffset,
TieredMessageStoreConfig storeConfig) {
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
similarity index 57%
copy from
tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
copy to
tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
index 25f4a6b6c..f7e5488da 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
@@ -16,69 +16,25 @@
*/
package org.apache.rocketmq.tieredstore.mock;
-import java.io.File;
-import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
-import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
import org.junit.Assert;
-public class MemoryFileSegment extends TieredFileSegment {
- private final ByteBuffer memStore;
+public class MemoryFileSegmentWithoutCheck extends MemoryFileSegment {
- public CompletableFuture<Boolean> blocker;
-
- public static boolean checkSize = true;
-
- public MemoryFileSegment(TieredFileSegment.FileSegmentType fileType,
MessageQueue messageQueue, long baseOffset,
+ public MemoryFileSegmentWithoutCheck(FileSegmentType fileType,
+ MessageQueue messageQueue, long baseOffset,
TieredMessageStoreConfig storeConfig) {
super(fileType, messageQueue, baseOffset, storeConfig);
- switch (fileType) {
- case COMMIT_LOG:
- memStore = ByteBuffer.allocate(10000);
- break;
- case CONSUME_QUEUE:
- memStore = ByteBuffer.allocate(10000);
- break;
- case INDEX:
- memStore = ByteBuffer.allocate(10000);
- break;
- default:
- memStore = null;
- break;
- }
- memStore.position((int) getSize());
- }
-
- @Override
- public String getPath() {
- return "/tiered/" + fileType + File.separator + baseOffset;
}
@Override
public long getSize() {
- if (checkSize) {
- return 1000;
- }
return 0;
}
- @Override
- public void createFile() {
-
- }
-
- @Override
- public CompletableFuture<ByteBuffer> read0(long position, int length) {
- ByteBuffer buffer = memStore.duplicate();
- buffer.position((int) position);
- ByteBuffer slice = buffer.slice();
- slice.limit(length);
- return CompletableFuture.completedFuture(slice);
- }
-
@Override
public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream
inputStream, long position, int length,
boolean append) {
@@ -90,8 +46,6 @@ public class MemoryFileSegment extends TieredFileSegment {
Assert.fail(e.getMessage());
}
- Assert.assertTrue(!checkSize || position >= getSize());
-
byte[] buffer = new byte[1024];
int startPos = memStore.position();
@@ -107,14 +61,4 @@ public class MemoryFileSegment extends TieredFileSegment {
}
return CompletableFuture.completedFuture(true);
}
-
- @Override
- public boolean exists() {
- return false;
- }
-
- @Override
- public void destroyFile() {
-
- }
}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
index 0f2ee2f37..736da0637 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
@@ -25,6 +25,7 @@ import java.util.Random;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
import org.junit.After;
@@ -33,19 +34,23 @@ import org.junit.Before;
import org.junit.Test;
public class PosixFileSegmentTest {
- TieredMessageStoreConfig storeConfig;
- MessageQueue mq;
+ private TieredMessageStoreConfig storeConfig;
+ private MessageQueue mq;
+
+ private final String storePath = FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID();
@Before
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
- storeConfig.setTieredStoreFilepath(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID());
+ storeConfig.setTieredStoreFilepath(storePath);
mq = new MessageQueue("OSSFileSegmentTest", "broker", 0);
}
@After
public void tearDown() throws IOException {
- FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() +
File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
+ TieredStoreTestUtil.destroyContainerManager();
+ TieredStoreTestUtil.destroyMetadataStore();
+ TieredStoreTestUtil.destroyTempDir(storePath);
}
@Test