This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch DirectByteBuffer in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 16e3dc365107b2eebeb9644eadafc5cb56517269 Author: JackieTien97 <[email protected]> AuthorDate: Tue Jan 12 15:50:48 2021 +0800 finish --- .../db/engine/storagegroup/StorageGroupInfo.java | 51 +++++++++++-- .../engine/storagegroup/StorageGroupProcessor.java | 87 +++++++++++++++++++++- .../db/engine/storagegroup/TsFileProcessor.java | 12 +-- .../java/org/apache/iotdb/db/utils/MmapUtil.java | 71 ++++++++++++++++++ .../writelog/manager/MultiFileLogNodeManager.java | 36 +++++---- .../db/writelog/manager/WriteLogNodeManager.java | 7 +- .../db/writelog/node/ExclusiveWriteLogNode.java | 24 ++++-- .../iotdb/db/writelog/node/WriteLogNode.java | 3 +- .../iotdb/db/writelog/recover/LogReplayer.java | 12 ++- .../writelog/recover/TsFileRecoverPerformer.java | 16 ++-- .../iotdb/db/writelog/IoTDBLogFileSizeTest.java | 25 ++++++- .../apache/iotdb/db/writelog/PerformanceTest.java | 50 ++++++++++--- .../iotdb/db/writelog/WriteLogNodeManagerTest.java | 63 +++++++++++++--- .../apache/iotdb/db/writelog/WriteLogNodeTest.java | 78 +++++++++++++++---- .../iotdb/db/writelog/recover/LogReplayerTest.java | 24 +++++- .../recover/RecoverResourceFromReaderTest.java | 54 +++++++++++--- .../db/writelog/recover/SeqTsFileRecoverTest.java | 51 +++++++++++-- .../writelog/recover/UnseqTsFileRecoverTest.java | 31 +++++++- 18 files changed, 579 insertions(+), 116 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java index a31d41a..d0620a0 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java @@ -18,12 +18,18 @@ */ package org.apache.iotdb.db.engine.storagegroup; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.rescon.SystemInfo; +import org.apache.iotdb.db.utils.MmapUtil; +import org.apache.iotdb.db.utils.TestOnly; /** * The storageGroupInfo records the total memory cost of the Storage Group. @@ -33,15 +39,15 @@ public class StorageGroupInfo { private StorageGroupProcessor storageGroupProcessor; /** - * The total Storage group memory cost, - * including unsealed TsFileResource, ChunkMetadata, WAL, primitive arrays and TEXT values + * The total Storage group memory cost, including unsealed TsFileResource, ChunkMetadata, WAL, + * primitive arrays and TEXT values */ private AtomicLong memoryCost; /** * The threshold of reporting it's size to SystemInfo */ - private long storageGroupSizeReportThreshold = + private long storageGroupSizeReportThreshold = IoTDBDescriptor.getInstance().getConfig().getStorageGroupSizeReportThreshold(); private AtomicLong lastReportedSize = new AtomicLong(); @@ -94,13 +100,46 @@ public class StorageGroupInfo { } /** - * When a TsFileProcessor is closing, remove it from reportedTsps, and report to systemInfo - * to update SG cost. - * + * When a TsFileProcessor is closing, remove it from reportedTsps, and report to systemInfo to + * update SG cost. + * * @param tsFileProcessor */ public void closeTsFileProcessorAndReportToSystem(TsFileProcessor tsFileProcessor) { reportedTsps.remove(tsFileProcessor); SystemInfo.getInstance().resetStorageGroupStatus(this, true); } + + public Supplier<ByteBuffer[]> getWalSupplier() { + if (storageGroupProcessor != null) { + return storageGroupProcessor::getWalDirectByteBuffer; + } else { // only happens in test + return this::walSupplier; + } + } + + @TestOnly + private ByteBuffer[] walSupplier() { + ByteBuffer[] buffers = new ByteBuffer[2]; + buffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + buffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + return buffers; + } + + public Consumer<ByteBuffer[]> getWalConsumer() { + if (storageGroupProcessor != null) { + return storageGroupProcessor::releaseWalBuffer; + } else { // only happens in test + return this::walConsumer; + } + } + + @TestOnly + private void walConsumer(ByteBuffer[] buffers) { + for (ByteBuffer byteBuffer : buffers) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index d32386f..9e2479b 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -25,11 +25,14 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.Deque; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -38,7 +41,10 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.io.FileUtils; @@ -84,6 +90,7 @@ import org.apache.iotdb.db.query.control.QueryFileManager; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.service.UpgradeSevice; import org.apache.iotdb.db.utils.CopyOnReadLinkedList; +import org.apache.iotdb.db.utils.MmapUtil; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer; import org.apache.iotdb.rpc.RpcUtils; @@ -261,6 +268,79 @@ public class StorageGroupProcessor { private List<CloseFileListener> customCloseFileListeners = Collections.emptyList(); private List<FlushListener> customFlushListeners = Collections.emptyList(); + private static final int WAL_BUFFER_SIZE = + IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2; + + private static final int MAX_WAL_BYTEBUFFER_NUM = + IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition() * 4; + + private static final long DEFAULT_POOL_TRIM_INTERVAL_MILLIS = 10_000; + + private final Deque<ByteBuffer> walByteBufferPool = new LinkedList<>(); + + private int currentWalPoolSize = 0; + + + /** + * get the direct byte buffer from pool, each fetch contains two ByteBuffer + */ + public ByteBuffer[] getWalDirectByteBuffer() { + ByteBuffer[] res = new ByteBuffer[2]; + synchronized (walByteBufferPool) { + while (walByteBufferPool.isEmpty() && currentWalPoolSize + 2 > MAX_WAL_BYTEBUFFER_NUM) { + try { + walByteBufferPool.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger + .error("getDirectByteBuffer occurs error while waiting for DirectByteBuffer" + + "group {}", storageGroupName, e); + } + } + // If the queue is not empty, it must have at least two. + if (!walByteBufferPool.isEmpty()) { + res[0] = walByteBufferPool.pollFirst(); + res[1] = walByteBufferPool.pollFirst(); + } else { + // if the queue is empty and current size is less than MAX_BYTEBUFFER_NUM + // we can construct another two more new byte buffer + currentWalPoolSize += 2; + res[0] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE); + res[1] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE); + } + } + return res; + } + + /** + * put the byteBuffer back to pool + */ + public void releaseWalBuffer(ByteBuffer[] byteBuffers) { + for (ByteBuffer byteBuffer : byteBuffers) { + byteBuffer.clear(); + } + synchronized (walByteBufferPool) { + walByteBufferPool.addLast(byteBuffers[0]); + walByteBufferPool.addLast(byteBuffers[1]); + walByteBufferPool.notifyAll(); + } + } + + /** + * trim the size of the pool and release the memory of needless direct byte buffer + */ + private void trimTask() { + synchronized (walByteBufferPool) { + int expectedSize = + (workSequenceTsFileProcessors.size() + workUnsequenceTsFileProcessors.size()) * 2; + while (expectedSize < currentWalPoolSize && !walByteBufferPool.isEmpty()) { + MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast()); + MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast()); + currentWalPoolSize -= 2; + } + } + } + public StorageGroupProcessor(String systemDir, String storageGroupName, TsFileFlushPolicy fileFlushPolicy) throws StorageGroupProcessorException { this.storageGroupName = storageGroupName; @@ -277,6 +357,9 @@ public class StorageGroupProcessor { this.tsFileManagement = IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy() .getTsFileManagement(storageGroupName, storageGroupSysDir.getAbsolutePath()); + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + executorService.scheduleWithFixedDelay(this::trimTask, DEFAULT_POOL_TRIM_INTERVAL_MILLIS, + DEFAULT_POOL_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); recover(); } @@ -580,7 +663,7 @@ public class StorageGroupProcessor { try { // this tsfile is not zero level, no need to perform redo wal if (LevelCompactionTsFileManagement.getMergeLevel(tsFileResource.getTsFile()) > 0) { - writer = recoverPerformer.recover(false); + writer = recoverPerformer.recover(false, this::getWalDirectByteBuffer, this::releaseWalBuffer); if (writer.hasCrashed()) { tsFileManagement.addRecover(tsFileResource, isSeq); } else { @@ -589,7 +672,7 @@ public class StorageGroupProcessor { } continue; } else { - writer = recoverPerformer.recover(true); + writer = recoverPerformer.recover(true, this::getWalDirectByteBuffer, this::releaseWalBuffer); } } catch (StorageGroupProcessorException e) { logger.warn("Skip TsFile: {} because of error in recover: ", tsFileResource.getTsFilePath(), diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 5b404c9..c632782 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -177,8 +177,7 @@ public class TsFileProcessor { if (enableMemControl) { workMemTable = new PrimitiveMemTable(enableMemControl); MemTableManager.getInstance().addMemtableNumber(); - } - else { + } else { workMemTable = MemTableManager.getInstance().getAvailableMemTable(storageGroupName); } } @@ -226,8 +225,7 @@ public class TsFileProcessor { if (enableMemControl) { workMemTable = new PrimitiveMemTable(enableMemControl); MemTableManager.getInstance().addMemtableNumber(); - } - else { + } else { workMemTable = MemTableManager.getInstance().getAvailableMemTable(storageGroupName); } } @@ -871,7 +869,8 @@ public class TsFileProcessor { public WriteLogNode getLogNode() { if (logNode == null) { logNode = MultiFileLogNodeManager.getInstance() - .getNode(storageGroupName + "-" + tsFileResource.getTsFile().getName()); + .getNode(storageGroupName + "-" + tsFileResource.getTsFile().getName(), + storageGroupInfo.getWalSupplier()); } return logNode; } @@ -881,7 +880,8 @@ public class TsFileProcessor { // when closing resource file, its corresponding mod file is also closed. tsFileResource.close(); MultiFileLogNodeManager.getInstance() - .deleteNode(storageGroupName + "-" + tsFileResource.getTsFile().getName()); + .deleteNode(storageGroupName + "-" + tsFileResource.getTsFile().getName(), + storageGroupInfo.getWalConsumer()); } catch (IOException e) { throw new TsFileProcessorException(e); } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java b/server/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java new file mode 100644 index 0000000..8ca1cc6 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.utils; + +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; + +public class MmapUtil { + + public static void clean(MappedByteBuffer mappedByteBuffer) { + if (mappedByteBuffer == null || !mappedByteBuffer.isDirect() || mappedByteBuffer.capacity()== 0) + return; + invoke(invoke(viewed(mappedByteBuffer), "cleaner"), "clean"); + } + + private static Object invoke(final Object target, final String methodName, final Class<?>... args) { + return AccessController.doPrivileged((PrivilegedAction<Object>) () -> { + try { + Method method = method(target, methodName, args); + method.setAccessible(true); + return method.invoke(target); + } catch (Exception e) { + throw new IllegalStateException(e); + } + }); + } + + private static Method method(Object target, String methodName, Class<?>[] args) + throws NoSuchMethodException { + try { + return target.getClass().getMethod(methodName, args); + } catch (NoSuchMethodException e) { + return target.getClass().getDeclaredMethod(methodName, args); + } + } + + private static ByteBuffer viewed(ByteBuffer buffer) { + String methodName = "viewedBuffer"; + Method[] methods = buffer.getClass().getMethods(); + for (Method method : methods) { + if (method.getName().equals("attachment")) { + methodName = "attachment"; + break; + } + } + ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName); + if (viewedBuffer == null) + return buffer; + else + return viewed(viewedBuffer); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java index f07d69f..00b9c44 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java @@ -19,11 +19,14 @@ package org.apache.iotdb.db.writelog.manager; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StartupException; @@ -35,18 +38,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * MultiFileLogNodeManager manages all ExclusiveWriteLogNodes, each manages WALs of a TsFile - * (either seq or unseq). + * MultiFileLogNodeManager manages all ExclusiveWriteLogNodes, each manages WALs of a TsFile (either + * seq or unseq). */ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService { private static final Logger logger = LoggerFactory.getLogger(MultiFileLogNodeManager.class); - private Map<String, WriteLogNode> nodeMap; + private final Map<String, WriteLogNode> nodeMap; private ScheduledExecutorService executorService; - private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - private final void forceTask() { + private void forceTask() { if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) { logger.warn("system mode is read-only, the force flush WAL task is stopped"); return; @@ -75,23 +78,16 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService { @Override - public WriteLogNode getNode(String identifier) { - WriteLogNode node = nodeMap.get(identifier); - if (node == null) { - node = new ExclusiveWriteLogNode(identifier); - WriteLogNode oldNode = nodeMap.putIfAbsent(identifier, node); - if (oldNode != null) { - return oldNode; - } - } - return node; + public WriteLogNode getNode(String identifier, Supplier<ByteBuffer[]> supplier) { + return nodeMap + .computeIfAbsent(identifier, key -> new ExclusiveWriteLogNode(key, supplier.get())); } @Override - public void deleteNode(String identifier) throws IOException { + public void deleteNode(String identifier, Consumer<ByteBuffer[]> consumer) throws IOException { WriteLogNode node = nodeMap.remove(identifier); if (node != null) { - node.delete(); + consumer.accept(node.delete()); } } @@ -148,9 +144,11 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService { } private static class InstanceHolder { - private InstanceHolder(){} - private static MultiFileLogNodeManager instance = new MultiFileLogNodeManager(); + private InstanceHolder() { + } + + private static final MultiFileLogNodeManager instance = new MultiFileLogNodeManager(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java b/server/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java index 84d02fc..f46d57a 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/manager/WriteLogNodeManager.java @@ -19,6 +19,9 @@ package org.apache.iotdb.db.writelog.manager; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.function.Consumer; +import java.util.function.Supplier; import org.apache.iotdb.db.writelog.node.WriteLogNode; /** @@ -33,14 +36,14 @@ public interface WriteLogNodeManager { * @param identifier -identifier, the format: "{storageGroupName}-{BufferWrite/Overflow}-{ * nameOfTsFile}" */ - WriteLogNode getNode(String identifier); + WriteLogNode getNode(String identifier, Supplier<ByteBuffer[]> supplier); /** * Delete a log node. If the log node does not exist, this will be an empty operation. * * @param identifier -identifier */ - void deleteNode(String identifier) throws IOException; + void deleteNode(String identifier, Consumer<ByteBuffer[]> consumer) throws IOException; /** * Close all nodes. diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java index f241db5..6c73713 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java @@ -59,10 +59,8 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - private ByteBuffer logBufferWorking = ByteBuffer - .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); - private ByteBuffer logBufferIdle = ByteBuffer - .allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + private ByteBuffer logBufferWorking; + private ByteBuffer logBufferIdle; private ByteBuffer logBufferFlushing; private final Object switchBufferCondition = new Object(); @@ -83,10 +81,12 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive * * @param identifier ExclusiveWriteLogNode identifier */ - public ExclusiveWriteLogNode(String identifier) { + public ExclusiveWriteLogNode(String identifier, ByteBuffer[] byteBuffers) { this.identifier = identifier; this.logDirectory = DirectoryManager.getInstance().getWALFolder() + File.separator + this.identifier; + this.logBufferWorking = byteBuffers[0]; + this.logBufferIdle = byteBuffers[1]; if (SystemFileFactory.INSTANCE.getFile(logDirectory).mkdirs()) { logger.info("create the WAL folder {}.", logDirectory); } @@ -197,12 +197,24 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive } @Override - public void delete() throws IOException { + public ByteBuffer[] delete() throws IOException { lock.lock(); try { close(); FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(logDirectory)); deleted = true; + ByteBuffer[] res = new ByteBuffer[2]; + int index = 0; + if (logBufferWorking != null) { + res[index++] = logBufferWorking; + } + if (logBufferIdle != null) { + res[index++] = logBufferIdle; + } + if (logBufferFlushing != null) { + res[index] = logBufferFlushing; + } + return res; } finally { lock.unlock(); } diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java index a93117b..4ca048f 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.writelog.node; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.writelog.io.ILogReader; @@ -76,7 +77,7 @@ public interface WriteLogNode { * Abandon all logs in this node and delete the log directory. Calling insert() after calling * this method is undefined. */ - void delete() throws IOException; + ByteBuffer[] delete() throws IOException; /** * return an ILogReader which can iterate each log in this log node. diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java index 4be75cc..2426414 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java @@ -20,9 +20,11 @@ package org.apache.iotdb.db.writelog.recover; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.modification.Deletion; @@ -85,9 +87,10 @@ public class LogReplayer { * finds the logNode of the TsFile given by insertFilePath and logNodePrefix, reads the WALs from * the logNode and redoes them into a given MemTable and ModificationFile. */ - public void replayLogs() { - WriteLogNode logNode = MultiFileLogNodeManager.getInstance().getNode( - logNodePrefix + FSFactoryProducer.getFSFactory().getFile(insertFilePath).getName()); + public void replayLogs(Supplier<ByteBuffer[]> supplier) { + WriteLogNode logNode = MultiFileLogNodeManager.getInstance() + .getNode(logNodePrefix + FSFactoryProducer.getFSFactory().getFile(insertFilePath).getName(), + supplier); ILogReader logReader = logNode.getLogReader(); try { @@ -171,7 +174,8 @@ public class LogReplayer { if (plan instanceof InsertRowPlan) { recoverMemTable.insert((InsertRowPlan) plan); } else { - recoverMemTable.insertTablet((InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount()); + recoverMemTable + .insertTablet((InsertTabletPlan) plan, 0, ((InsertTabletPlan) plan).getRowCount()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java index 8120d00..3418847 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java @@ -23,10 +23,13 @@ import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SU import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import java.util.function.Supplier; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.flush.MemTableFlushTask; import org.apache.iotdb.db.engine.memtable.IMemTable; @@ -81,8 +84,8 @@ public class TsFileRecoverPerformer { * writing */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning - public RestorableTsFileIOWriter recover(boolean needRedoWal) - throws StorageGroupProcessorException { + public RestorableTsFileIOWriter recover(boolean needRedoWal, Supplier<ByteBuffer[]> supplier, + Consumer<ByteBuffer[]> consumer) throws StorageGroupProcessorException { File file = FSFactoryProducer.getFSFactory().getFile(filePath); if (!file.exists()) { @@ -120,12 +123,13 @@ public class TsFileRecoverPerformer { // redo logs if (needRedoWal) { - redoLogs(restorableTsFileIOWriter); + redoLogs(restorableTsFileIOWriter, supplier); // clean logs try { MultiFileLogNodeManager.getInstance() - .deleteNode(logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName()); + .deleteNode(logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName(), + consumer); } catch (IOException e) { throw new StorageGroupProcessorException(e); } @@ -196,13 +200,13 @@ public class TsFileRecoverPerformer { tsFileResource.updatePlanIndexes(restorableTsFileIOWriter.getMaxPlanIndex()); } - private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter) + private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter, Supplier<ByteBuffer[]> supplier) throws StorageGroupProcessorException { IMemTable recoverMemTable = new PrimitiveMemTable(); recoverMemTable.setVersion(versionController.nextVersion()); LogReplayer logReplayer = new LogReplayer(logNodePrefix, filePath, tsFileResource.getModFile(), versionController, tsFileResource, recoverMemTable, sequence); - logReplayer.replayLogs(); + logReplayer.replayLogs(supplier); try { if (!recoverMemTable.isEmpty()) { // flush logs diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java index 2a7a5ed..8386977 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/IoTDBLogFileSizeTest.java @@ -20,12 +20,15 @@ package org.apache.iotdb.db.writelog; import java.io.File; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.utils.MmapUtil; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode; import org.apache.iotdb.db.writelog.node.WriteLogNode; @@ -57,7 +60,7 @@ public class IoTDBLogFileSizeTest { return; } groupSize = TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte(); - TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte( 8 * 1024 * 1024); + TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(8 * 1024 * 1024); IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(8 * 1024 * 1024); EnvironmentUtils.closeStatMonitor(); EnvironmentUtils.envSetUp(); @@ -81,6 +84,11 @@ public class IoTDBLogFileSizeTest { return; } final long[] maxLength = {0}; + ByteBuffer[] buffers = new ByteBuffer[2]; + buffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + buffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); Thread writeThread = new Thread(() -> { int cnt = 0; try { @@ -100,7 +108,7 @@ public class IoTDBLogFileSizeTest { cnt); statement.execute(sql); WriteLogNode logNode = MultiFileLogNodeManager.getInstance().getNode( - "root.logFileTest.seq" + IoTDBConstant.SEQFILE_LOG_NODE_SUFFIX); + "root.logFileTest.seq" + IoTDBConstant.SEQFILE_LOG_NODE_SUFFIX, () -> buffers); File bufferWriteWALFile = new File( logNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME); if (bufferWriteWALFile.exists() && bufferWriteWALFile.length() > maxLength[0]) { @@ -117,6 +125,9 @@ public class IoTDBLogFileSizeTest { while (writeThread.isAlive()) { } + for (ByteBuffer byteBuffer : buffers) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } } @Test @@ -124,6 +135,11 @@ public class IoTDBLogFileSizeTest { if (skip) { return; } + ByteBuffer[] buffers = new ByteBuffer[2]; + buffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + buffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); final long[] maxLength = {0}; Thread writeThread = new Thread(() -> { int cnt = 0; @@ -143,7 +159,7 @@ public class IoTDBLogFileSizeTest { ++cnt, cnt); statement.execute(sql); WriteLogNode logNode = MultiFileLogNodeManager.getInstance() - .getNode("root.logFileTest.unsequence" + IoTDBConstant.UNSEQFILE_LOG_NODE_SUFFIX); + .getNode("root.logFileTest.unsequence" + IoTDBConstant.UNSEQFILE_LOG_NODE_SUFFIX, () -> buffers); File WALFile = new File( logNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME); if (WALFile.exists() && WALFile.length() > maxLength[0]) { @@ -160,6 +176,9 @@ public class IoTDBLogFileSizeTest { while (writeThread.isAlive()) { } + for (ByteBuffer byteBuffer : buffers) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } } private void executeSQL(String[] sqls) throws ClassNotFoundException { diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java index 1d6e918..cc62db0 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.writelog; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.util.Collections; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -31,6 +33,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.utils.MmapUtil; import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode; import org.apache.iotdb.db.writelog.node.WriteLogNode; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -82,13 +85,20 @@ public class PerformanceTest { tempRestore.createNewFile(); tempProcessorStore.createNewFile(); - WriteLogNode logNode = new ExclusiveWriteLogNode("root.testLogNode"); + ByteBuffer[] byteBuffers = new ByteBuffer[2]; + byteBuffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + byteBuffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + + WriteLogNode logNode = new ExclusiveWriteLogNode("root.testLogNode", byteBuffers); long time = System.currentTimeMillis(); for (int i = 0; i < 1000000; i++) { InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("logTestDevice"), 100, new String[]{"s1", "s2", "s3", "s4"}, - new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN}, + new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, + TSDataType.BOOLEAN}, new String[]{"1.0", "15", "str", "false"}); DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new PartialPath("root.logTestDevice.s1")); @@ -102,7 +112,10 @@ public class PerformanceTest { 3000000 + " logs use " + (System.currentTimeMillis() - time) + " ms at batch size " + config.getFlushWalThreshold()); - logNode.delete(); + ByteBuffer[] array = logNode.delete(); + for (ByteBuffer byteBuffer : array) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } tempRestore.delete(); tempProcessorStore.delete(); tempRestore.getParentFile().delete(); @@ -130,25 +143,37 @@ public class PerformanceTest { } catch (MetadataException ignored) { } IoTDB.metaManager - .createTimeseries(new PartialPath("root.logTestDevice.s1"), TSDataType.DOUBLE, TSEncoding.PLAIN, + .createTimeseries(new PartialPath("root.logTestDevice.s1"), TSDataType.DOUBLE, + TSEncoding.PLAIN, TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap()); IoTDB.metaManager - .createTimeseries(new PartialPath("root.logTestDevice.s2"), TSDataType.INT32, TSEncoding.PLAIN, + .createTimeseries(new PartialPath("root.logTestDevice.s2"), TSDataType.INT32, + TSEncoding.PLAIN, TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap()); IoTDB.metaManager - .createTimeseries(new PartialPath("root.logTestDevice.s3"), TSDataType.TEXT, TSEncoding.PLAIN, + .createTimeseries(new PartialPath("root.logTestDevice.s3"), TSDataType.TEXT, + TSEncoding.PLAIN, TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap()); IoTDB.metaManager - .createTimeseries(new PartialPath("root.logTestDevice.s4"), TSDataType.BOOLEAN, TSEncoding.PLAIN, + .createTimeseries(new PartialPath("root.logTestDevice.s4"), TSDataType.BOOLEAN, + TSEncoding.PLAIN, TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap()); - WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice"); + + ByteBuffer[] byteBuffers = new ByteBuffer[2]; + byteBuffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + byteBuffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", byteBuffers); for (int i = 0; i < 1000000; i++) { InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("root.logTestDevice"), 100, new String[]{"s1", "s2", "s3", "s4"}, - new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN}, + new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, + TSDataType.BOOLEAN}, new String[]{"1.0", "15", "str", "false"}); - DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new PartialPath("root.logTestDevice.s1")); + DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, + new PartialPath("root.logTestDevice.s1")); logNode.write(bwInsertPlan); logNode.write(deletePlan); @@ -159,7 +184,10 @@ public class PerformanceTest { System.out.println( 3000000 + " logs use " + (System.currentTimeMillis() - time) + "ms when recovering "); } finally { - logNode.delete(); + ByteBuffer[] array = logNode.delete(); + for (ByteBuffer byteBuffer : array) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } tempRestore.delete(); tempProcessorStore.delete(); tempRestore.getParentFile().delete(); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java index 332ee6b..cdff9cf 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java @@ -19,12 +19,15 @@ package org.apache.iotdb.db.writelog; import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertFalse; import static junit.framework.TestCase.assertNotSame; import static junit.framework.TestCase.assertSame; import static junit.framework.TestCase.assertTrue; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.metadata.IllegalPathException; @@ -32,11 +35,11 @@ import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.utils.MmapUtil; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager; import org.apache.iotdb.db.writelog.node.WriteLogNode; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.common.Path; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -64,15 +67,45 @@ public class WriteLogNodeManagerTest { public void testGetAndDelete() throws IOException { String identifier = "testLogNode"; WriteLogNodeManager manager = MultiFileLogNodeManager.getInstance(); - WriteLogNode logNode = manager.getNode(identifier); + WriteLogNode logNode = manager.getNode(identifier, () -> { + ByteBuffer[] buffers = new ByteBuffer[2]; + buffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + buffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + return buffers; + }); assertEquals(identifier, logNode.getIdentifier()); - WriteLogNode theSameNode = manager.getNode(identifier); + WriteLogNode theSameNode = manager.getNode(identifier, () -> { + ByteBuffer[] buffers = new ByteBuffer[2]; + buffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + buffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + return buffers; + }); assertSame(logNode, theSameNode); - manager.deleteNode(identifier); - WriteLogNode anotherNode = manager.getNode(identifier); + manager.deleteNode(identifier, (ByteBuffer[] array) -> { + for (ByteBuffer byteBuffer : array) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } + }); + WriteLogNode anotherNode = manager.getNode(identifier, () -> { + ByteBuffer[] buffers = new ByteBuffer[2]; + buffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + buffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + return buffers; + }); assertNotSame(logNode, anotherNode); + manager.deleteNode(identifier, (ByteBuffer[] array) -> { + for (ByteBuffer byteBuffer : array) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } + }); } @Test @@ -84,17 +117,24 @@ public class WriteLogNodeManagerTest { File tempProcessorStore = File.createTempFile("managerTest", "processorStore"); WriteLogNodeManager manager = MultiFileLogNodeManager.getInstance(); - WriteLogNode logNode = manager - .getNode("root.managerTest"); + WriteLogNode logNode = manager.getNode("root.managerTest", () -> { + ByteBuffer[] buffers = new ByteBuffer[2]; + buffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + buffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + return buffers; + }); InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("logTestDevice"), 100, new String[]{"s1", "s2", "s3", "s4"}, new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN}, new String[]{"1.0", "15", "str", "false"}); - DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new PartialPath("root.logTestDevice.s1")); + DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, + new PartialPath("root.logTestDevice.s1")); File walFile = new File(logNode.getLogDirectory() + File.separator + "wal1"); - assertTrue(!walFile.exists()); + assertFalse(walFile.exists()); logNode.write(bwInsertPlan); logNode.write(deletePlan); @@ -102,7 +142,10 @@ public class WriteLogNodeManagerTest { Thread.sleep(config.getForceWalPeriodInMs() + 1000); assertTrue(walFile.exists()); - logNode.delete(); + ByteBuffer[] buffers = logNode.delete(); + for (ByteBuffer byteBuffer : buffers) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } config.setForceWalPeriodInMs(flushWalPeriod); tempRestore.delete(); tempProcessorStore.delete(); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java index fe5610c..eadeb38 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java @@ -24,6 +24,8 @@ import static junit.framework.TestCase.assertTrue; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.iotdb.db.conf.IoTDBConfig; @@ -34,6 +36,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.utils.MmapUtil; import org.apache.iotdb.db.writelog.io.ILogReader; import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode; import org.apache.iotdb.db.writelog.node.WriteLogNode; @@ -68,7 +71,12 @@ public class WriteLogNodeTest { // then reads the logs from file String identifier = "root.logTestDevice"; - WriteLogNode logNode = new ExclusiveWriteLogNode(identifier); + ByteBuffer[] byteBuffers = new ByteBuffer[2]; + byteBuffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + byteBuffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + WriteLogNode logNode = new ExclusiveWriteLogNode(identifier, byteBuffers); InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath(identifier), 100, new String[]{"s1", "s2", "s3", "s4"}, @@ -96,7 +104,7 @@ public class WriteLogNodeTest { } InsertTabletPlan tabletPlan = new InsertTabletPlan(new PartialPath(identifier), - new String[]{"s1", "s2", "s3", "s4"}, dataTypes); + new String[]{"s1", "s2", "s3", "s4"}, dataTypes); tabletPlan.setTimes(times); tabletPlan.setColumns(columns); tabletPlan.setRowCount(times.length); @@ -122,7 +130,10 @@ public class WriteLogNodeTest { assertEquals(newPlan.getMeasurements().length, 3); reader.close(); - logNode.delete(); + ByteBuffer[] array = logNode.delete(); + for (ByteBuffer byteBuffer : array) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } } @Test @@ -131,7 +142,12 @@ public class WriteLogNodeTest { // then calls notifyStartFlush() and notifyEndFlush() to delete old file String identifier = "root.logTestDevice"; - WriteLogNode logNode = new ExclusiveWriteLogNode(identifier); + ByteBuffer[] byteBuffers = new ByteBuffer[2]; + byteBuffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + byteBuffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + WriteLogNode logNode = new ExclusiveWriteLogNode(identifier, byteBuffers); InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath(identifier), 100, new String[]{"s1", "s2", "s3", "s4"}, @@ -159,7 +175,10 @@ public class WriteLogNodeTest { assertFalse(logReader.hasNext()); logReader.close(); - logNode.delete(); + ByteBuffer[] array = logNode.delete(); + for (ByteBuffer byteBuffer : array) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } } @Test @@ -168,13 +187,19 @@ public class WriteLogNodeTest { int flushWalThreshold = config.getFlushWalThreshold(); config.setFlushWalThreshold(2); - WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice"); + ByteBuffer[] byteBuffers = new ByteBuffer[2]; + byteBuffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + byteBuffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", byteBuffers); InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("root.logTestDevice"), 100, new String[]{"s1", "s2", "s3", "s4"}, new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN}, new String[]{"1.0", "15", "str", "false"}); - DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new PartialPath("root.logTestDevice.s1")); + DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, + new PartialPath("root.logTestDevice.s1")); logNode.write(bwInsertPlan); @@ -189,7 +214,10 @@ public class WriteLogNodeTest { } assertTrue(walFile.exists()); - logNode.delete(); + ByteBuffer[] array = logNode.delete(); + for (ByteBuffer byteBuffer : array) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } config.setFlushWalThreshold(flushWalThreshold); } @@ -198,13 +226,19 @@ public class WriteLogNodeTest { // this test uses a dummy insert log node to insert a few logs and flushes them // then deletes the node - WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice"); + ByteBuffer[] byteBuffers = new ByteBuffer[2]; + byteBuffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + byteBuffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice", byteBuffers); InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("logTestDevice"), 100, new String[]{"s1", "s2", "s3", "s4"}, new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN}, new String[]{"1.0", "15", "str", "false"}); - DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new PartialPath("root.logTestDevice.s1")); + DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, + new PartialPath("root.logTestDevice.s1")); logNode.write(bwInsertPlan); logNode.write(deletePlan); @@ -219,16 +253,25 @@ public class WriteLogNodeTest { } assertTrue(new File(logNode.getLogDirectory()).exists()); - logNode.delete(); - assertTrue(!new File(logNode.getLogDirectory()).exists()); + ByteBuffer[] array = logNode.delete(); + for (ByteBuffer byteBuffer : array) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } + assertFalse(new File(logNode.getLogDirectory()).exists()); } @Test public void testOverSizedWAL() throws IOException, IllegalPathException { // this test uses a dummy insert log node to insert an over-sized log and assert exception caught - WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice.oversize"); - - InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("root.logTestDevice.oversize"), 100, + ByteBuffer[] byteBuffers = new ByteBuffer[2]; + byteBuffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + byteBuffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + WriteLogNode logNode = new ExclusiveWriteLogNode("root.logTestDevice.oversize", byteBuffers); + + InsertRowPlan bwInsertPlan = new InsertRowPlan(new PartialPath("root.logTestDevice.oversize"), + 100, new String[]{"s1", "s2", "s3", "s4"}, new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN}, new String[]{"1.0", "15", new String(new char[65 * 1024 * 1024]), "false"}); @@ -241,6 +284,9 @@ public class WriteLogNodeTest { } assertTrue(caught); - logNode.delete(); + ByteBuffer[] array = logNode.delete(); + for (ByteBuffer byteBuffer : array) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } } } diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java index f7a5c63..a64c754 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java @@ -26,9 +26,12 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable; @@ -50,6 +53,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.utils.MmapUtil; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.db.writelog.node.WriteLogNode; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -109,7 +113,12 @@ public class LogReplayerTest { versionController, tsFileResource, memTable, false); WriteLogNode node = - MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsFile.getName()); + MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsFile.getName(), () -> { + ByteBuffer[] byteBuffers = new ByteBuffer[2]; + byteBuffers[0] = ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + byteBuffers[1] = ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + return byteBuffers; + }); node.write( new InsertRowPlan(new PartialPath("root.sg.device0"), 100, "sensor0", TSDataType.INT64, String.valueOf(0))); @@ -124,7 +133,12 @@ public class LogReplayerTest { node.write(deletePlan); node.close(); - replayer.replayLogs(); + replayer.replayLogs(() -> { + ByteBuffer[] byteBuffers = new ByteBuffer[2]; + byteBuffers[0] = ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + byteBuffers[1] = ByteBuffer.allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + return byteBuffers; + }); for (int i = 0; i < 5; i++) { ReadOnlyMemChunk memChunk = memTable @@ -175,7 +189,11 @@ public class LogReplayerTest { } } finally { modFile.close(); - MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + tsFile.getName()); + MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + tsFile.getName(), (ByteBuffer[] byteBuffers) -> { + for (ByteBuffer byteBuffer : byteBuffers) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } + }); modF.delete(); tsFile.delete(); tsFile.getParentFile().delete(); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java index 94d4feb..3a0bbe6 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java @@ -21,12 +21,15 @@ package org.apache.iotdb.db.writelog.recover; import static org.junit.Assert.assertEquals; -import java.util.*; import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.util.Collections; import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBConstant; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -38,6 +41,7 @@ import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.utils.MmapUtil; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.db.writelog.node.WriteLogNode; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -87,7 +91,8 @@ public class RecoverResourceFromReaderTest { schema = new Schema(); for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { - PartialPath path = new PartialPath("root.sg.device" + i + IoTDBConstant.PATH_SEPARATOR + "sensor" + j); + PartialPath path = new PartialPath( + "root.sg.device" + i + IoTDBConstant.PATH_SEPARATOR + "sensor" + j); MeasurementSchema measurementSchema = new MeasurementSchema("sensor" + j, TSDataType.INT64, TSEncoding.PLAIN); schema.registerTimeseries(path.toTSFilePath(), measurementSchema); @@ -99,17 +104,20 @@ public class RecoverResourceFromReaderTest { schema.registerTimeseries(new Path(("root.sg.device99"), ("sensor4")), new MeasurementSchema("sensor4", TSDataType.INT64, TSEncoding.PLAIN)); IoTDB.metaManager - .createTimeseries(new PartialPath("root.sg.device99.sensor4"), TSDataType.INT64, TSEncoding.PLAIN, + .createTimeseries(new PartialPath("root.sg.device99.sensor4"), TSDataType.INT64, + TSEncoding.PLAIN, TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap()); schema.registerTimeseries(new Path(("root.sg.device99"), ("sensor2")), new MeasurementSchema("sensor2", TSDataType.INT64, TSEncoding.PLAIN)); IoTDB.metaManager - .createTimeseries(new PartialPath("root.sg.device99.sensor2"), TSDataType.INT64, TSEncoding.PLAIN, + .createTimeseries(new PartialPath("root.sg.device99.sensor2"), TSDataType.INT64, + TSEncoding.PLAIN, TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap()); schema.registerTimeseries(new Path(("root.sg.device99"), ("sensor1")), new MeasurementSchema("sensor1", TSDataType.INT64, TSEncoding.PLAIN)); IoTDB.metaManager - .createTimeseries(new PartialPath("root.sg.device99.sensor1"), TSDataType.INT64, TSEncoding.PLAIN, + .createTimeseries(new PartialPath("root.sg.device99.sensor1"), TSDataType.INT64, + TSEncoding.PLAIN, TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap()); writer = new TsFileWriter(tsF, schema); @@ -134,7 +142,14 @@ public class RecoverResourceFromReaderTest { writer.flushAllChunkGroups(); writer.getIOWriter().close(); - node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName()); + node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName(), () -> { + ByteBuffer[] byteBuffers = new ByteBuffer[2]; + byteBuffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + byteBuffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + return byteBuffers; + }); for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { String[] measurements = new String[10]; @@ -145,16 +160,19 @@ public class RecoverResourceFromReaderTest { types[k] = TSDataType.INT64; values[k] = String.valueOf(k + 10); } - InsertRowPlan insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device" + j), i, measurements, + InsertRowPlan insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device" + j), i, + measurements, types, values); node.write(insertRowPlan); } node.notifyStartFlush(); } - InsertRowPlan insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device99"), 1, new String[]{"sensor4"}, + InsertRowPlan insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device99"), 1, + new String[]{"sensor4"}, new TSDataType[]{TSDataType.INT64}, new String[]{"4"}); node.write(insertRowPlan); - insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device99"), 300, new String[]{"sensor2"}, + insertRowPlan = new InsertRowPlan(new PartialPath("root.sg.device99"), 300, + new String[]{"sensor2"}, new TSDataType[]{TSDataType.INT64}, new String[]{"2"}); node.write(insertRowPlan); node.close(); @@ -167,7 +185,10 @@ public class RecoverResourceFromReaderTest { EnvironmentUtils.cleanEnv(); FileUtils.deleteDirectory(tsF.getParentFile()); resource.close(); - node.delete(); + ByteBuffer[] array = node.delete(); + for (ByteBuffer byteBuffer : array) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } } @Test @@ -183,7 +204,18 @@ public class RecoverResourceFromReaderTest { TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController, resource, false, false); - performer.recover(true).close(); + performer.recover(true, () -> { + ByteBuffer[] byteBuffers = new ByteBuffer[2]; + byteBuffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + byteBuffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + return byteBuffers; + }, (ByteBuffer[] byteBuffers) -> { + for (ByteBuffer byteBuffer : byteBuffers) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } + }).close(); assertEquals(1, resource.getStartTime("root.sg.device99")); assertEquals(300, resource.getEndTime("root.sg.device99")); for (int i = 0; i < 10; i++) { diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java index f4ff7cf..5c3ea2d 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java @@ -21,16 +21,18 @@ package org.apache.iotdb.db.writelog.recover; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -42,6 +44,7 @@ import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.utils.MmapUtil; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.db.writelog.node.WriteLogNode; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -99,7 +102,8 @@ public class SeqTsFileRecoverTest { for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { IoTDB.metaManager - .createTimeseries(new PartialPath("root.sg.device" + i + ".sensor" + j), TSDataType.INT64, + .createTimeseries(new PartialPath("root.sg.device" + i + ".sensor" + j), + TSDataType.INT64, TSEncoding.PLAIN, TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap()); } @@ -138,7 +142,14 @@ public class SeqTsFileRecoverTest { writer.flushAllChunkGroups(); writer.getIOWriter().close(); - node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName()); + node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName(), () -> { + ByteBuffer[] buffers = new ByteBuffer[2]; + buffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + buffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + return buffers; + }); for (int i = 10; i < 20; i++) { for (int j = 0; j < 10; j++) { String[] measurements = new String[10]; @@ -149,7 +160,8 @@ public class SeqTsFileRecoverTest { types[k] = TSDataType.INT64; values[k] = String.valueOf(k); } - InsertRowPlan insertPlan = new InsertRowPlan(new PartialPath("root.sg.device" + j), i, measurements, types, + InsertRowPlan insertPlan = new InsertRowPlan(new PartialPath("root.sg.device" + j), i, + measurements, types, values); node.write(insertPlan); } @@ -164,14 +176,28 @@ public class SeqTsFileRecoverTest { EnvironmentUtils.cleanEnv(); FileUtils.deleteDirectory(tsF.getParentFile()); resource.close(); - node.delete(); + ByteBuffer[] buffers = node.delete(); + for (ByteBuffer byteBuffer : buffers) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } } @Test public void testNonLastRecovery() throws StorageGroupProcessorException, IOException { TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController, resource, false, false); - RestorableTsFileIOWriter writer = performer.recover(true); + RestorableTsFileIOWriter writer = performer.recover(true, () -> { + ByteBuffer[] buffers = new ByteBuffer[2]; + buffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + buffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + return buffers; + }, (ByteBuffer[] array) -> { + for (ByteBuffer byteBuffer : array) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } + }); assertFalse(writer.canWrite()); writer.close(); @@ -220,7 +246,18 @@ public class SeqTsFileRecoverTest { public void testLastRecovery() throws StorageGroupProcessorException, IOException { TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController, resource, false, true); - RestorableTsFileIOWriter writer = performer.recover(true); + RestorableTsFileIOWriter writer = performer.recover(true, () -> { + ByteBuffer[] buffers = new ByteBuffer[2]; + buffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + buffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + return buffers; + }, (ByteBuffer[] array) -> { + for (ByteBuffer byteBuffer : array) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } + }); writer.makeMetadataVisible(); assertEquals(11, writer.getMetadatasForQuery().size()); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java index 8caa93a..2d4562f 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java @@ -23,9 +23,12 @@ import static org.junit.Assert.assertEquals; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.util.Collections; import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBConstant; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -39,6 +42,7 @@ import org.apache.iotdb.db.query.reader.chunk.ChunkDataIterator; import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.utils.MmapUtil; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.db.writelog.node.WriteLogNode; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; @@ -142,7 +146,14 @@ public class UnseqTsFileRecoverTest { writer.flushAllChunkGroups(); writer.getIOWriter().close(); - node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName()); + node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName(), () -> { + ByteBuffer[] buffers = new ByteBuffer[2]; + buffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + buffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + return buffers; + }); for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { String[] measurements = new String[10]; @@ -172,7 +183,10 @@ public class UnseqTsFileRecoverTest { public void tearDown() throws IOException, StorageEngineException { FileUtils.deleteDirectory(tsF.getParentFile()); resource.close(); - node.delete(); + ByteBuffer[] array = node.delete(); + for (ByteBuffer byteBuffer : array) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } EnvironmentUtils.cleanEnv(); } @@ -180,7 +194,18 @@ public class UnseqTsFileRecoverTest { public void test() throws StorageGroupProcessorException, IOException { TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController, resource, false, false); - performer.recover(true).close(); + performer.recover(true, () -> { + ByteBuffer[] buffers = new ByteBuffer[2]; + buffers[0] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + buffers[1] = ByteBuffer + .allocateDirect(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2); + return buffers; + }, (ByteBuffer[] array) -> { + for (ByteBuffer byteBuffer : array) { + MmapUtil.clean((MappedByteBuffer) byteBuffer); + } + }).close(); assertEquals(1, resource.getStartTime("root.sg.device99")); assertEquals(300, resource.getEndTime("root.sg.device99"));
