This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch manage_flush_pool in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit fa888914fdbc3d01f297fa7572bfd6461931a5d8 Author: lta <[email protected]> AuthorDate: Wed Jul 24 11:34:32 2019 +0800 reconstruct flush pool class --- .../{storagegroup => flush}/FlushManager.java | 32 ++++++- .../{memtable => flush}/MemTableFlushTask.java | 7 +- .../{memtable => flush}/NotifyFlushMemTable.java | 5 +- .../pool/AbstractPoolManager.java} | 65 ++++++------- .../engine/flush/pool/FlushSubTaskPoolManager.java | 79 ++++++++++++++++ .../db/engine/flush/pool/FlushTaskPoolManager.java | 77 +++++++++++++++ .../iotdb/db/engine/memtable/AbstractMemTable.java | 3 +- .../iotdb/db/engine/pool/FlushPoolManager.java | 104 --------------------- .../db/engine/storagegroup/TsFileProcessor.java | 19 ++-- .../java/org/apache/iotdb/db/service/IoTDB.java | 2 + .../org/apache/iotdb/db/service/ServiceType.java | 3 +- .../iotdb/db/tools/MemEst/MemEstToolCmd.java | 6 +- .../writelog/recover/TsFileRecoverPerformer.java | 2 +- .../db/engine/memtable/MemTableFlushTaskTest.java | 1 + 14 files changed, 239 insertions(+), 166 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java similarity index 70% rename from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java rename to server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java index 6141ab5..f996c18 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java @@ -16,20 +16,42 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.engine.storagegroup; +package org.apache.iotdb.db.engine.flush; import java.util.concurrent.ConcurrentLinkedDeque; -import org.apache.iotdb.db.engine.pool.FlushPoolManager; +import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager; +import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager; +import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; +import org.apache.iotdb.db.exception.StartupException; +import org.apache.iotdb.db.service.IService; +import org.apache.iotdb.db.service.ServiceType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FlushManager { +public class FlushManager implements IService { private static final Logger logger = LoggerFactory.getLogger(FlushManager.class); private ConcurrentLinkedDeque<TsFileProcessor> tsFileProcessorQueue = new ConcurrentLinkedDeque<>(); - private FlushPoolManager flushPool = FlushPoolManager.getInstance(); + private FlushTaskPoolManager flushPool = FlushTaskPoolManager.getInstance(); + + @Override + public void start() throws StartupException { + FlushSubTaskPoolManager.getInstance().start(); + FlushTaskPoolManager.getInstance().start(); + } + + @Override + public void stop() { + FlushSubTaskPoolManager.getInstance().stop(); + FlushTaskPoolManager.getInstance().stop(); + } + + @Override + public ServiceType getID() { + return ServiceType.FLUSH_SERVICE; + } class FlushThread implements Runnable { @@ -46,7 +68,7 @@ public class FlushManager { * Add BufferWriteProcessor to asyncTryToFlush manager */ @SuppressWarnings("squid:S2445") - void registerTsFileProcessor(TsFileProcessor tsFileProcessor) { + public void registerTsFileProcessor(TsFileProcessor tsFileProcessor) { synchronized (tsFileProcessor) { if (!tsFileProcessor.isManagedByFlushManager() && tsFileProcessor.getFlushingMemTableSize() > 0) { logger.info("storage group {} begin to submit a flush thread, flushing memtable size: {}", diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java similarity index 97% rename from server/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java rename to server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index e4cddf5..b6cdb97 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -12,13 +12,16 @@ * or implied. See the License for the specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.engine.memtable; +package org.apache.iotdb.db.engine.flush; import java.io.IOException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import org.apache.iotdb.db.engine.pool.FlushSubTaskPoolManager; +import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager; +import org.apache.iotdb.db.engine.memtable.ChunkBufferPool; +import org.apache.iotdb.db.engine.memtable.IMemTable; +import org.apache.iotdb.db.engine.memtable.IWritableMemChunk; import org.apache.iotdb.db.exception.FlushRunTimeException; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/NotifyFlushMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java similarity index 85% rename from server/src/main/java/org/apache/iotdb/db/engine/memtable/NotifyFlushMemTable.java rename to server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java index 1862509..295dadf 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/NotifyFlushMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java @@ -16,8 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.engine.memtable; +package org.apache.iotdb.db.engine.flush; +import org.apache.iotdb.db.engine.memtable.AbstractMemTable; +import org.apache.iotdb.db.engine.memtable.IMemTable; +import org.apache.iotdb.db.engine.memtable.IWritableMemChunk; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; /** diff --git a/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushSubTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java similarity index 53% rename from server/src/main/java/org/apache/iotdb/db/engine/pool/FlushSubTaskPoolManager.java rename to server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java index 243f052..cd11ec1 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushSubTaskPoolManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java @@ -16,50 +16,40 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.engine.pool; + +package org.apache.iotdb.db.engine.flush.pool; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; -import org.apache.iotdb.db.concurrent.ThreadName; -import org.apache.iotdb.db.exception.ProcessorException; - -public class FlushSubTaskPoolManager { - - private static final int EXIT_WAIT_TIME = 60 * 1000; - - private ExecutorService pool; +import org.apache.iotdb.db.exception.StartupException; +import org.slf4j.Logger; - private FlushSubTaskPoolManager() { - this.pool = IoTDBThreadPoolFactory - .newCachedThreadPool(ThreadName.FLUSH_SUB_TASK_SERVICE.getName()); - } +public abstract class AbstractPoolManager { + + private static final int WAIT_TIMEOUT = 2000; - public static FlushSubTaskPoolManager getInstance() { - return FlushSubTaskPoolManager.InstanceHolder.instance; - } + protected ExecutorService pool; /** * Block new flush submits and exit when all RUNNING THREADS AND TASKS IN THE QUEUE end. - * - * @param block if set to true, this method will wait for timeOut milliseconds. - * @param timeout block time out in milliseconds. - * @throws ProcessorException if timeOut is reached or being interrupted while waiting to exit. */ - public void close(boolean block, long timeout) throws ProcessorException { - pool.shutdown(); - if (block) { + public void close() { + Logger logger = getLogger(); + pool.shutdownNow(); + long totalWaitTime = WAIT_TIMEOUT; + logger.info("Waiting for {} thread pool to shut down.", getName()); + while (!pool.isTerminated()) { try { - if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { - throw new ProcessorException("Flush thread pool doesn't exit after " - + EXIT_WAIT_TIME + " ms"); + if (!pool.awaitTermination(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) { + logger.info("{} thread pool doesn't exit after {}ms.", getName(), + + totalWaitTime); } + totalWaitTime += WAIT_TIMEOUT; } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ProcessorException("Interrupted while waiting flush thread pool to exit. ", e); + logger.error("Interrupted while waiting {} thread pool to exit. ", getName(), e); } } } @@ -76,15 +66,6 @@ public class FlushSubTaskPoolManager { return ((ThreadPoolExecutor) pool).getActiveCount(); } - private static class InstanceHolder { - - private InstanceHolder() { - //allowed to do nothing - } - - private static FlushSubTaskPoolManager instance = new FlushSubTaskPoolManager(); - } - public int getWaitingTasksNumber() { return ((ThreadPoolExecutor) pool).getQueue().size(); } @@ -92,4 +73,12 @@ public class FlushSubTaskPoolManager { public int getCorePoolSize() { return ((ThreadPoolExecutor) pool).getCorePoolSize(); } + + public abstract Logger getLogger(); + + public abstract void start(); + + public abstract void stop(); + + public abstract String getName(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java new file mode 100644 index 0000000..448fb49 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.flush.pool; + +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.concurrent.ThreadName; +import org.apache.iotdb.db.service.IService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlushSubTaskPoolManager extends AbstractPoolManager { + + private static final Logger LOGGER = LoggerFactory + .getLogger(FlushSubTaskPoolManager.class); + + private FlushSubTaskPoolManager() { + this.pool = IoTDBThreadPoolFactory + .newCachedThreadPool(ThreadName.FLUSH_SUB_TASK_SERVICE.getName()); + } + + public static FlushSubTaskPoolManager getInstance() { + return FlushSubTaskPoolManager.InstanceHolder.instance; + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public String getName() { + return "flush sub task"; + } + + @Override + public void start() { + if (pool == null) { + this.pool = IoTDBThreadPoolFactory + .newCachedThreadPool(ThreadName.FLUSH_SUB_TASK_SERVICE.getName()); + } + LOGGER.info("Flush encoding sub task manager started."); + } + + @Override + public void stop() { + if (pool != null) { + close(); + pool = null; + } + LOGGER.info("Flush encoding sub task manager stopped"); + } + + private static class InstanceHolder { + + private InstanceHolder() { + //allowed to do nothing + } + + private static FlushSubTaskPoolManager instance = new FlushSubTaskPoolManager(); + } + +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java new file mode 100644 index 0000000..aec372c --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.flush.pool; + +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.concurrent.ThreadName; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlushTaskPoolManager extends AbstractPoolManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(FlushTaskPoolManager.class); + + private FlushTaskPoolManager() { + int threadCnt = IoTDBDescriptor.getInstance().getConfig().getConcurrentFlushThread(); + pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName()); + } + + public static FlushTaskPoolManager getInstance() { + return InstanceHolder.instance; + } + + @Override + public Logger getLogger() { + return LOGGER; + } + + @Override + public String getName() { + return "flush task"; + } + + @Override + public void start() { + if (pool == null) { + int threadCnt = IoTDBDescriptor.getInstance().getConfig().getConcurrentFlushThread(); + pool = IoTDBThreadPoolFactory + .newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName()); + } + LOGGER.info("Flush task manager started."); + } + + @Override + public void stop() { + if (pool != null) { + close(); + pool = null; + } + LOGGER.info("Flush task manager stopped"); + } + + private static class InstanceHolder { + + private InstanceHolder() { + //allowed to do nothing + } + + private static FlushTaskPoolManager instance = new FlushTaskPoolManager(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index d25e913..a4fd2a9 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -27,9 +27,8 @@ import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; -import org.apache.iotdb.db.utils.MemUtils; -import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.db.rescon.TVListAllocator; +import org.apache.iotdb.db.utils.MemUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; public abstract class AbstractMemTable implements IMemTable { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java deleted file mode 100644 index fcc560b..0000000 --- a/server/src/main/java/org/apache/iotdb/db/engine/pool/FlushPoolManager.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.engine.pool; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; -import org.apache.iotdb.db.concurrent.ThreadName; -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.ProcessorException; - -public class FlushPoolManager { - - private static final int EXIT_WAIT_TIME = 60 * 1000; - - private ExecutorService pool; - private int threadCnt; - - private FlushPoolManager() { - IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - this.threadCnt = config.getConcurrentFlushThread(); - this.pool = IoTDBThreadPoolFactory.newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName()); - } - - public static FlushPoolManager getInstance() { - return InstanceHolder.instance; - } - - /** - * Block new flush submits and exit when all RUNNING THREADS AND TASKS IN THE QUEUE end. - * - * @param block - * if set to true, this method will wait for timeOut milliseconds. - * @param timeout - * block time out in milliseconds. - * @throws ProcessorException - * if timeOut is reached or being interrupted while waiting to exit. - */ - public void close(boolean block, long timeout) throws ProcessorException { - pool.shutdown(); - if (block) { - try { - if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { - throw new ProcessorException("Flush thread pool doesn't exit after " - + EXIT_WAIT_TIME + " ms"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ProcessorException("Interrupted while waiting flush thread pool to exit. ", e); - } - } - } - - public synchronized Future<?> submit(Runnable task) { - return pool.submit(task); - } - - public synchronized <T>Future<T> submit(Callable<T> task){ - return pool.submit(task); - } - - public int getActiveCnt() { - return ((ThreadPoolExecutor) pool).getActiveCount(); - } - - public int getThreadCnt() { - return threadCnt; - } - - private static class InstanceHolder { - private InstanceHolder(){ - //allowed to do nothing - } - private static FlushPoolManager instance = new FlushPoolManager(); - } - - public int getWaitingTasksNumber() { - return ((ThreadPoolExecutor) pool).getQueue().size(); - } - - public int getCorePoolSize() { - return ((ThreadPoolExecutor) pool).getCorePoolSize(); - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index d27db69..34dcbac 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -31,11 +31,11 @@ import java.util.function.Supplier; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.adapter.CompressionRatio; -import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.flush.FlushManager; +import org.apache.iotdb.db.engine.flush.MemTableFlushTask; +import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable; import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger; -import org.apache.iotdb.db.engine.memtable.MemTableFlushTask; -import org.apache.iotdb.db.engine.memtable.NotifyFlushMemTable; import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; @@ -364,7 +364,7 @@ public class TsFileProcessor { * Take the first MemTable from the flushingMemTables and flush it. Called by a flush thread of * the flush manager pool */ - void flushOneMemTable() { + public void flushOneMemTable() { IMemTable memTableToFlush; memTableToFlush = flushingMemTables.getFirst(); @@ -417,7 +417,8 @@ public class TsFileProcessor { } endFile(); } catch (IOException | TsFileProcessorException e) { - logger.error("meet error when flush FileMetadata to {}, change system mode to read-only", tsFileResource.getFile().getAbsolutePath()); + logger.error("meet error when flush FileMetadata to {}, change system mode to read-only", + tsFileResource.getFile().getAbsolutePath()); IoTDBDescriptor.getInstance().getConfig().setReadOnly(true); try { writer.reset(); @@ -458,7 +459,7 @@ public class TsFileProcessor { } - boolean isManagedByFlushManager() { + public boolean isManagedByFlushManager() { return managedByFlushManager; } @@ -480,11 +481,11 @@ public class TsFileProcessor { } } - void setManagedByFlushManager(boolean managedByFlushManager) { + public void setManagedByFlushManager(boolean managedByFlushManager) { this.managedByFlushManager = managedByFlushManager; } - int getFlushingMemTableSize() { + public int getFlushingMemTableSize() { return flushingMemTables.size(); } @@ -496,7 +497,7 @@ public class TsFileProcessor { return writer; } - String getStorageGroupName() { + public String getStorageGroupName() { return storageGroupName; } diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java index 2a9d19a..0c70887 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java +++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter; import org.apache.iotdb.db.cost.statistic.Measurement; import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.flush.FlushManager; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.monitor.StatMonitor; @@ -90,6 +91,7 @@ public class IoTDB implements IoTDBMBean { } initMManager(); + registerManager.register(FlushManager.getInstance()); registerManager.register(StorageEngine.getInstance()); registerManager.register(MultiFileLogNodeManager.getInstance()); registerManager.register(JMXService.getInstance()); diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java index 4f872fd..bff3e49 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java +++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java @@ -32,7 +32,8 @@ public enum ServiceType { FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""), SYNC_SERVICE("SYNC ServerService", ""), PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE","PERFORMANCE_STATISTIC_SERVICE"), - TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""); + TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""), + FLUSH_SERVICE("Flush ServerService", ""); private String name; private String jmxName; diff --git a/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java b/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java index 9c5d96d..6159232 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/MemEst/MemEstToolCmd.java @@ -73,12 +73,12 @@ public class MemEstToolCmd implements Runnable { MManager.getInstance().clear(); long sgCnt = 1; - long tsCnt = 1; + long tsCnt = 0; try { for (; sgCnt <= sgNum; sgCnt++) { IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1); } - for (; tsCnt <= tsNum; tsCnt++) { + for (; tsCnt < tsNum; tsCnt++) { IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(1); if (maxTsNum == 0) { maxTsNumValid = tsCnt / sgNum + 1; @@ -91,7 +91,7 @@ public class MemEstToolCmd implements Runnable { } catch (ConfigAdjusterException e) { if (sgCnt > sgNum) { - maxProcess = Math.max(maxProcess, tsCnt * 100 / tsNum); + maxProcess = Math.max(maxProcess, (tsCnt + 1) * 100 / tsNum); System.out .print(String.format("Memory estimation progress : %d%%\r", maxProcess)); } diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java index c6dc976..d012e3d 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java @@ -27,7 +27,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import org.apache.iotdb.db.engine.memtable.IMemTable; -import org.apache.iotdb.db.engine.memtable.MemTableFlushTask; +import org.apache.iotdb.db.engine.flush.MemTableFlushTask; import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.version.VersionController; diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java index e2b0155..6306338 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.util.concurrent.ExecutionException; import org.apache.iotdb.db.engine.MetadataManagerHelper; +import org.apache.iotdb.db.engine.flush.MemTableFlushTask; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
