This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch refactor_overflow in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 39e0889107dea42466bc74e43a4d419473619f52 Author: 江天 <[email protected]> AuthorDate: Tue May 28 12:17:29 2019 +0800 migrate functions of FileNodeManager to StorageGroupManager --- .../iotdb/db/engine/filenode/FileNodeManager.java | 5 +- .../db/engine/filenode/FileNodeProcessor.java | 13 +- .../db/engine/sgmanager/StorageGroupManager.java | 412 ++++++++++----------- .../db/engine/sgmanager/StorageGroupProcessor.java | 12 +- .../db/engine/tsfiledata/TsFileProcessor.java | 4 +- .../apache/iotdb/db/monitor/MonitorConstants.java | 4 +- .../org/apache/iotdb/db/monitor/StatMonitor.java | 4 +- 7 files changed, 220 insertions(+), 234 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java index e291e88..0404a5c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java @@ -51,6 +51,7 @@ import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.monitor.IStatistic; import org.apache.iotdb.db.monitor.MonitorConstants; import org.apache.iotdb.db.monitor.MonitorConstants.StorageGroupManagerStatConstants; +import org.apache.iotdb.db.monitor.MonitorConstants.StorageGroupProcessorStatConstants; import org.apache.iotdb.db.monitor.StatMonitor; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; @@ -301,10 +302,10 @@ public class FileNodeManager implements IStatistic, IService { // Modify the insert if (!isMonitor) { fileNodeProcessor.getStatParamsHashMap() - .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_POINTS_SUCCESS.name()) + .get(StorageGroupProcessorStatConstants.TOTAL_POINTS_SUCCESS.name()) .addAndGet(tsRecord.dataPointList.size()); fileNodeProcessor.getStatParamsHashMap() - .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_REQ_SUCCESS.name()) + .get(StorageGroupProcessorStatConstants.TOTAL_REQ_SUCCESS.name()) .incrementAndGet(); statParamsHashMap.get(StorageGroupManagerStatConstants.TOTAL_REQ_SUCCESS.name()) .incrementAndGet(); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java index 508d689..a66116e 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java @@ -76,6 +76,7 @@ import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.monitor.IStatistic; import org.apache.iotdb.db.monitor.MonitorConstants; +import org.apache.iotdb.db.monitor.MonitorConstants.StorageGroupProcessorStatConstants; import org.apache.iotdb.db.monitor.StatMonitor; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; @@ -246,8 +247,8 @@ public class FileNodeProcessor extends Processor implements IStatistic { FileNodeProcessor(String fileNodeDirPath, String processorName) throws FileNodeProcessorException { super(processorName); - for (MonitorConstants.FileNodeProcessorStatConstants statConstant : - MonitorConstants.FileNodeProcessorStatConstants.values()) { + for (StorageGroupProcessorStatConstants statConstant : + StorageGroupProcessorStatConstants.values()) { statParamsHashMap.put(statConstant.name(), new AtomicLong(0)); } statStorageDeltaName = @@ -327,8 +328,8 @@ public class FileNodeProcessor extends Processor implements IStatistic { @Override public void registerStatMetadata() { Map<String, String> hashMap = new HashMap<>(); - for (MonitorConstants.FileNodeProcessorStatConstants statConstant : - MonitorConstants.FileNodeProcessorStatConstants.values()) { + for (StorageGroupProcessorStatConstants statConstant : + StorageGroupProcessorStatConstants.values()) { hashMap .put(statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPARATOR + statConstant.name(), MonitorConstants.DATA_TYPE_INT64); @@ -339,8 +340,8 @@ public class FileNodeProcessor extends Processor implements IStatistic { @Override public List<String> getAllPathForStatistic() { List<String> list = new ArrayList<>(); - for (MonitorConstants.FileNodeProcessorStatConstants statConstant : - MonitorConstants.FileNodeProcessorStatConstants.values()) { + for (StorageGroupProcessorStatConstants statConstant : + StorageGroupProcessorStatConstants.values()) { list.add( statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPARATOR + statConstant.name()); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java index 51ed0e3..d313c38 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupManager.java @@ -36,15 +36,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.Directories; -import org.apache.iotdb.db.engine.Processor; import org.apache.iotdb.db.engine.filenode.FileNodeProcessor; import org.apache.iotdb.db.engine.filenode.TsFileResource; import org.apache.iotdb.db.engine.memcontrol.BasicMemController; import org.apache.iotdb.db.engine.pool.FlushManager; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -import org.apache.iotdb.db.exception.BufferWriteProcessorException; import org.apache.iotdb.db.exception.StorageGroupManagerException; -import org.apache.iotdb.db.exception.FileNodeProcessorException; +import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.exception.TsFileProcessorException; @@ -80,7 +78,7 @@ public class StorageGroupManager implements IStatistic, IService { private static final Directories directories = Directories.getInstance(); /** - * This map is used to manage all StorageGroupProcessors,<br> the key is filenode name which is + * This map is used to manage all StorageGroupProcessors,<br> the key is storage group name which is * storage group seriesPath. * e.g. if the user executes a query "SET STORAGE GROUP TO root.beijing", then "root.beijing" will * be a key. @@ -88,9 +86,9 @@ public class StorageGroupManager implements IStatistic, IService { private ConcurrentHashMap<String, StorageGroupProcessor> processorMap; /** - * fileNodeManagerStatus indicates whether the Manager is merging or being closed. + * storageGroupManagerStatus indicates whether the Manager is merging or being closed. */ - private volatile FileNodeManagerStatus fileNodeManagerStatus = FileNodeManagerStatus.NONE; + private volatile StorageGroupManagerStatus storageGroupManagerStatus = StorageGroupManagerStatus.NONE; private HashMap<String, AtomicLong> statParamsHashMap; @@ -101,9 +99,9 @@ public class StorageGroupManager implements IStatistic, IService { private void initStat() { statParamsHashMap = new HashMap<>(); - for (StorageGroupManagerStatConstants fileNodeManagerStatConstant : + for (StorageGroupManagerStatConstants StorageGroupManagerStatConstant : StorageGroupManagerStatConstants.values()) { - statParamsHashMap.put(fileNodeManagerStatConstant.name(), new AtomicLong(0)); + statParamsHashMap.put(StorageGroupManagerStatConstant.name(), new AtomicLong(0)); } if (TsFileDBConf.isEnableStatMonitor()) { @@ -114,7 +112,7 @@ public class StorageGroupManager implements IStatistic, IService { } public static StorageGroupManager getInstance() { - return FileNodeManagerHolder.INSTANCE; + return StorageGroupManagerHolder.INSTANCE; } private void updateStatHashMapWhenFail(TSRecord tsRecord) { @@ -174,7 +172,7 @@ public class StorageGroupManager implements IStatistic, IService { /** * This function is just for unit test. */ - public synchronized void resetFileNodeManager() { + public synchronized void resetStorageGroupManager() { for (String key : statParamsHashMap.keySet()) { statParamsHashMap.put(key, new AtomicLong()); } @@ -182,49 +180,49 @@ public class StorageGroupManager implements IStatistic, IService { } /** - * @param filenodeName storage name, e.g., root.a.b + * @param processorName storage name, e.g., root.a.b */ - private StorageGroupProcessor constructNewProcessor(String filenodeName) + private StorageGroupProcessor constructNewProcessor(String processorName) throws StorageGroupManagerException { try { - return new StorageGroupProcessor(filenodeName); + return new StorageGroupProcessor(processorName); } catch (TsFileProcessorException e) { throw new StorageGroupManagerException(String.format("Can't construct the " - + "StorageGroupProcessor, the StorageGroup is %s", filenodeName), e); + + "StorageGroupProcessor, the StorageGroup is %s", processorName), e); } } @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") private StorageGroupProcessor getProcessor(String path, boolean isWriteLock) throws StorageGroupManagerException { - String filenodeName; + String storageGroupName; try { // return the stroage name - filenodeName = MManager.getInstance().getStorageGroupByPath(path); + storageGroupName = MManager.getInstance().getStorageGroupByPath(path); } catch (PathErrorException e) { throw new StorageGroupManagerException( String.format("MManager get StorageGroup name error, seriesPath is %s", path), e); } StorageGroupProcessor processor; - processor = processorMap.get(filenodeName); + processor = processorMap.get(storageGroupName); if (processor != null) { processor.lock(isWriteLock); } else { - filenodeName = filenodeName.intern(); + storageGroupName = storageGroupName.intern(); // calculate the value with same key synchronously - synchronized (filenodeName) { - processor = processorMap.get(filenodeName); + synchronized (storageGroupName) { + processor = processorMap.get(storageGroupName); if (processor != null) { processor.lock(isWriteLock); } else { // calculate the value with the key monitor if (LOGGER.isDebugEnabled()) { - LOGGER.debug("construct a processor instance, the filenode is {}, Thread is {}", - filenodeName, Thread.currentThread().getId()); + LOGGER.debug("construct a processor instance, the storage group is {}, Thread is {}", + storageGroupName, Thread.currentThread().getId()); } - processor = constructNewProcessor(filenodeName); + processor = constructNewProcessor(storageGroupName); processor.lock(isWriteLock); - processorMap.put(filenodeName, processor); + processorMap.put(storageGroupName, processor); } } } @@ -235,22 +233,22 @@ public class StorageGroupManager implements IStatistic, IService { * recover the StorageGroupProcessors. */ public void recover() throws StorageGroupManagerException { - List<String> filenodeNames; + List<String> storageGroupNames; try { - filenodeNames = MManager.getInstance().getAllStorageGroups(); + storageGroupNames = MManager.getInstance().getAllStorageGroups(); } catch (PathErrorException e) { - throw new StorageGroupManagerException("Restoring all FileNodes failed.", e); + throw new StorageGroupManagerException("Restoring all StorageGroups failed.", e); } - for (String filenodeName : filenodeNames) { - StorageGroupProcessor fileNodeProcessor = null; + for (String storageGroupName : storageGroupNames) { + StorageGroupProcessor StorageGroupProcessor = null; try { - fileNodeProcessor = getProcessor(filenodeName, true); + StorageGroupProcessor = getProcessor(storageGroupName, true); } catch (StorageGroupManagerException e) { - throw new StorageGroupManagerException(String.format("Restoring fileNode %s failed.", - filenodeName), e); + throw new StorageGroupManagerException(String.format("Restoring StorageGroup %s failed.", + storageGroupName), e); } finally { - if (fileNodeProcessor != null) { - fileNodeProcessor.writeUnlock(); + if (StorageGroupProcessor != null) { + StorageGroupProcessor.writeUnlock(); } } } @@ -287,10 +285,10 @@ public class StorageGroupManager implements IStatistic, IService { // Modify the insert if (!isMonitor) { processor.getStatParamsHashMap() - .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_POINTS_SUCCESS.name()) + .get(MonitorConstants.StorageGroupProcessorStatConstants.TOTAL_POINTS_SUCCESS.name()) .addAndGet(plan.getValues().length); processor.getStatParamsHashMap() - .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_REQ_SUCCESS.name()) + .get(MonitorConstants.StorageGroupProcessorStatConstants.TOTAL_REQ_SUCCESS.name()) .incrementAndGet(); statParamsHashMap.get(StorageGroupManagerStatConstants.TOTAL_REQ_SUCCESS.name()) .incrementAndGet(); @@ -361,7 +359,7 @@ public class StorageGroupManager implements IStatistic, IService { } private void deleteProcessor(String processorName, - Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator) + Iterator<Map.Entry<String, StorageGroupProcessor>> processorIterator) throws StorageGroupManagerException { if (!processorMap.containsKey(processorName)) { LOGGER.warn("The processorMap doesn't contain the StorageGroupProcessor {}.", processorName); @@ -431,7 +429,7 @@ public class StorageGroupManager implements IStatistic, IService { StorageGroupProcessor processor = getProcessor(deviceId, true); try { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Get the FileNodeProcessor: storage group is {}, begin query.", + LOGGER.debug("Get the StorageGroupProcessor: storage group is {}, begin query.", processor.getProcessorName()); } return processor.addMultiPassCount(); @@ -449,7 +447,7 @@ public class StorageGroupManager implements IStatistic, IService { StorageGroupProcessor processorrocessor = getProcessor(deviceId, true); try { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Get the FileNodeProcessor: {} end query.", + LOGGER.debug("Get the StorageGroupProcessor: {} end query.", processorrocessor.getProcessorName()); } processorrocessor.decreaseMultiPassCount(token); @@ -471,7 +469,7 @@ public class StorageGroupManager implements IStatistic, IService { String deviceId = seriesExpression.getSeriesPath().getDevice(); StorageGroupProcessor processor = getProcessor(deviceId, false); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Get the FileNodeProcessor: filenode is {}, query.", + LOGGER.debug("Get the StorageGroupProcessor: storage group is {}, query.", processor.getProcessorName()); } try { @@ -492,12 +490,12 @@ public class StorageGroupManager implements IStatistic, IService { * Append one specified tsfile to the storage group. <b>This method is only provided for * transmission module</b> * - * @param fileNodeName the seriesPath of storage group + * @param storageGroupName the seriesPath of storage group * @param appendFile the appended tsfile information */ - public boolean appendFileToFileNode(String fileNodeName, TsFileResource appendFile, + public boolean appendFileToStorageGroup(String storageGroupName, TsFileResource appendFile, String appendFilePath) throws StorageGroupManagerException { - StorageGroupProcessor processor = getProcessor(fileNodeName, true); + StorageGroupProcessor processor = getProcessor(storageGroupName, true); try { // check append file for (Map.Entry<String, Long> entry : appendFile.getStartTimeMap().entrySet()) { @@ -510,7 +508,7 @@ public class StorageGroupManager implements IStatistic, IService { // append file to storage group. processor.appendFile(appendFile, appendFilePath); } catch (TsFileProcessorException e) { - LOGGER.error("Cannot append the file {} to {}", appendFile.getFile().getAbsolutePath(), fileNodeName, e); + LOGGER.error("Cannot append the file {} to {}", appendFile.getFile().getAbsolutePath(), storageGroupName, e); throw new StorageGroupManagerException(e); } finally { processor.writeUnlock(); @@ -521,63 +519,79 @@ public class StorageGroupManager implements IStatistic, IService { /** * get all overlap tsfiles which are conflict with the appendFile. * - * @param fileNodeName the seriesPath of storage group + * @param storageGroupName the seriesPath of storage group * @param appendFile the appended tsfile information */ - public List<String> getOverlapFilesFromFileNode(String fileNodeName, TsFileResource appendFile, + public List<String> getOverlapFilesFromStorageGroup(String storageGroupName, TsFileResource appendFile, String uuid) throws StorageGroupManagerException { - FileNodeProcessor fileNodeProcessor = getProcessor(fileNodeName, true); + StorageGroupProcessor processor = getProcessor(storageGroupName, true); List<String> overlapFiles; try { - overlapFiles = fileNodeProcessor.getOverlapFiles(appendFile, uuid); - } catch (FileNodeProcessorException e) { + overlapFiles = processor.getOverlapFiles(appendFile, uuid); + } catch (TsFileProcessorException e) { throw new StorageGroupManagerException(e); } finally { - fileNodeProcessor.writeUnlock(); + processor.writeUnlock(); } return overlapFiles; } /** - * merge all overflowed filenode. + * merge all overflowed storage group. * * @throws StorageGroupManagerException StorageGroupManagerException */ public void mergeAll() throws StorageGroupManagerException { - if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) { - LOGGER.warn("Failed to merge all overflowed filenode, because filenode manager status is {}", - fileNodeManagerStatus); + if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) { + LOGGER.warn("Unable to merge all storage groups when the status is {}", + storageGroupManagerStatus); return; } - fileNodeManagerStatus = FileNodeManagerStatus.MERGE; - LOGGER.info("Start to merge all overflowed filenode"); - List<String> allFileNodeNames; + storageGroupManagerStatus = StorageGroupManagerStatus.MERGE; + LOGGER.info("Start to merge all storage groups"); + List<String> storageGroupNames; try { - allFileNodeNames = MManager.getInstance().getAllStorageGroups(); + storageGroupNames = MManager.getInstance().getAllStorageGroups(); } catch (PathErrorException e) { - LOGGER.error("Get all storage group seriesPath error,", e); throw new StorageGroupManagerException(e); } - List<Future<?>> futureTasks = new ArrayList<>(); - for (String fileNodeName : allFileNodeNames) { - FileNodeProcessor fileNodeProcessor = getProcessor(fileNodeName, true); + List<Future> futureTasks = new ArrayList<>(); + for (String storageGroupName : storageGroupNames) { + StorageGroupProcessor processor = getProcessor(storageGroupName, true); try { - Future<?> task = fileNodeProcessor.submitToMerge(); + Future task = processor.submitToMerge(); if (task != null) { - LOGGER.info("Submit the filenode {} to the merge pool", fileNodeName); + LOGGER.info("Submit the storage group {} to the merge pool", storageGroupName); futureTasks.add(task); } } finally { - fileNodeProcessor.writeUnlock(); + processor.writeUnlock(); + } + } + List<Exception> mergeExceptions = new ArrayList<>(); + waitForMergeTasks(futureTasks, mergeExceptions); + + if (!mergeExceptions.isEmpty()) { + LOGGER.error("There are {} errors when merging storage groups:", mergeExceptions.size()); + for (Exception exception : mergeExceptions) { + LOGGER.error("", exception); } + throw new StorageGroupManagerException(String.format("Merge encountered %d errors," + + " see previous logs for details", mergeExceptions.size())); } + storageGroupManagerStatus = StorageGroupManagerStatus.NONE; + LOGGER.info("End to merge all storage groups"); + } + + private void waitForMergeTasks(List<Future> tasks, List<Exception> exceptions) { long totalTime = 0; // loop waiting for merge to end, the longest waiting time is // 60s. int time = 2; - List<Exception> mergeException = new ArrayList<>(); - for (Future<?> task : futureTasks) { + int singleWaitThreshold = 60; + + for (Future<?> task : tasks) { while (!task.isDone()) { try { LOGGER.info( @@ -586,42 +600,36 @@ public class StorageGroupManager implements IStatistic, IService { totalTime, time); TimeUnit.SECONDS.sleep(time); totalTime += time; - time = updateWaitTime(time); + time = updateWaitTime(time, singleWaitThreshold); } catch (InterruptedException e) { - LOGGER.error("Unexpected interruption {}", e); + LOGGER.error("Unexpected interruption when waiting for merge", e); Thread.currentThread().interrupt(); } } try { task.get(); } catch (InterruptedException e) { - LOGGER.error("Unexpected interruption {}", e); + LOGGER.error("Unexpected interruption when receiving merge result", e); + Thread.currentThread().interrupt(); } catch (ExecutionException e) { - mergeException.add(e); - LOGGER.error("The exception for merge: {}", e); + exceptions.add(e); } } - if (!mergeException.isEmpty()) { - // just throw the first exception - throw new StorageGroupManagerException(mergeException.get(0)); - } - fileNodeManagerStatus = FileNodeManagerStatus.NONE; - LOGGER.info("End to merge all overflowed filenode"); } - private int updateWaitTime(int time) { - return time < 32 ? time * 2 : 60; + private int updateWaitTime(int time, int threshold) { + return time * 2 < threshold ? time * 2 : threshold; } /** - * try to close the filenode processor. The name of filenode processor is processorName + * try to close the storage group processor. The name of storage group processor is processorName */ private boolean closeOneProcessor(String processorName) throws StorageGroupManagerException { if (!processorMap.containsKey(processorName)) { return true; } - Processor processor = processorMap.get(processorName); + StorageGroupProcessor processor = processorMap.get(processorName); if (processor.tryWriteLock()) { try { if (processor.canBeClosed()) { @@ -630,8 +638,8 @@ public class StorageGroupManager implements IStatistic, IService { } else { return false; } - } catch (ProcessorException e) { - LOGGER.error("Close the filenode processor {} error.", processorName, e); + } catch (TsFileProcessorException e) { + LOGGER.error("Close the storage group processor {} error.", processorName, e); throw new StorageGroupManagerException(e); } finally { processor.writeUnlock(); @@ -642,87 +650,91 @@ public class StorageGroupManager implements IStatistic, IService { } /** - * delete one filenode. + * delete one storage group. */ - public void deleteOneFileNode(String processorName) throws StorageGroupManagerException { - if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) { + public void deleteOneStorageGroup(String processorName) throws StorageGroupManagerException { + if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) { return; } - fileNodeManagerStatus = FileNodeManagerStatus.CLOSE; + storageGroupManagerStatus = StorageGroupManagerStatus.CLOSE; try { if (processorMap.containsKey(processorName)) { - deleteFileNodeBlocked(processorName); + deleteStorageGroupBlocked(processorName); } - String fileNodePath = TsFileDBConf.getFileNodeDir(); - fileNodePath = standardizeDir(fileNodePath) + processorName; - FileUtils.deleteDirectory(new File(fileNodePath)); - cleanBufferWrite(processorName); + cleanDataFiles(processorName); MultiFileLogNodeManager.getInstance() .deleteNode(processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX); MultiFileLogNodeManager.getInstance() .deleteNode(processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX); } catch (IOException e) { - LOGGER.error("Delete the filenode processor {} error.", processorName, e); + LOGGER.error("Delete the storage group processor {} error.", processorName, e); throw new StorageGroupManagerException(e); } finally { - fileNodeManagerStatus = FileNodeManagerStatus.NONE; + storageGroupManagerStatus = StorageGroupManagerStatus.NONE; } } - private void cleanBufferWrite(String processorName) throws IOException { - List<String> bufferwritePathList = directories.getAllTsFileFolders(); - for (String bufferwritePath : bufferwritePathList) { - bufferwritePath = standardizeDir(bufferwritePath) + processorName; - File bufferDir = new File(bufferwritePath); + private void cleanDataFiles(String processorName) throws IOException { + List<String> seqFilePathList = directories.getAllTsFileFolders(); + List<String> overflowFilePathList = directories.getAllOverflowFileFolders(); + cleanDirectories(seqFilePathList, processorName); + cleanDirectories(overflowFilePathList, processorName); + } + + private void cleanDirectories(List<String> directoryPaths, String processorName) throws IOException { + for (String directoryPath : directoryPaths) { + directoryPath = standardizeDir(directoryPath) + processorName; + File directory = new File(directoryPath); // free and close the streams under this bufferwrite directory - if (!bufferDir.exists()) { + if (!directory.exists()) { continue; } - File[] bufferFiles = bufferDir.listFiles(); - if (bufferFiles != null) { - for (File bufferFile : bufferFiles) { - FileReaderManager.getInstance().closeFileAndRemoveReader(bufferFile.getPath()); + File[] dataFiles = directory.listFiles(); + if (dataFiles != null) { + for (File dataFile : dataFiles) { + FileReaderManager.getInstance().closeFileAndRemoveReader(dataFile.getPath()); } } - FileUtils.deleteDirectory(new File(bufferwritePath)); + FileUtils.deleteDirectory(new File(directoryPath)); } } - private void deleteFileNodeBlocked(String processorName) throws StorageGroupManagerException { - LOGGER.info("Forced to delete the filenode processor {}", processorName); - FileNodeProcessor processor = processorMap.get(processorName); + private void deleteStorageGroupBlocked(String processorName) throws StorageGroupManagerException { + LOGGER.info("Forced to delete the storage group processor {}", processorName); + StorageGroupProcessor processor = processorMap.get(processorName); while (true) { if (processor.tryWriteLock()) { try { if (processor.canBeClosed()) { - LOGGER.info("Delete the filenode processor {}.", processorName); - processor.delete(); + LOGGER.info("Delete the storage group processor {}.", processorName); + processor.close(); processorMap.remove(processorName); break; } else { LOGGER.info( - "Can't delete the filenode processor {}, " - + "because the filenode processor can't be closed." + "Can't delete the storage group processor {}, " + + "because the storage group processor can't be closed." + " Wait 100ms to retry"); } - } catch (ProcessorException e) { - LOGGER.error("Delete the filenode processor {} error.", processorName, e); - throw new StorageGroupManagerException(e); + } catch (TsFileProcessorException e) { + throw new StorageGroupManagerException(String. + format("Delete the storage group processor %s error.", processorName), e); } finally { processor.writeUnlock(); } } else { LOGGER.info( - "Can't delete the filenode processor {}, because it can't get the write lock." + "Can't delete the storage group processor {}, because it is already locked" + " Wait 100ms to retry", processorName); } try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { - LOGGER.error(e.getMessage()); + LOGGER.error("Unexpected interruption when waiting to delete storage group {}", + processorName); Thread.currentThread().interrupt(); } } @@ -744,67 +756,68 @@ public class StorageGroupManager implements IStatistic, IService { public void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding, CompressionType compressor, Map<String, String> props) throws StorageGroupManagerException { - FileNodeProcessor fileNodeProcessor = getProcessor(path.getFullPath(), true); + StorageGroupProcessor processor = getProcessor(path.getFullPath(), true); try { - fileNodeProcessor.addTimeSeries(path.getMeasurement(), dataType, encoding, compressor, props); + processor.addTimeSeries(path.getMeasurement(), dataType, encoding, compressor, props); } finally { - fileNodeProcessor.writeUnlock(); + processor.writeUnlock(); } } /** - * Force to close the filenode processor. + * Force to close the storage group processor. */ - public void closeOneFileNode(String processorName) throws StorageGroupManagerException { - if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) { + public void closeStorageGroup(String processorName) throws StorageGroupManagerException { + if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) { return; } - fileNodeManagerStatus = FileNodeManagerStatus.CLOSE; + storageGroupManagerStatus = StorageGroupManagerStatus.CLOSE; try { - LOGGER.info("Force to close the filenode processor {}.", processorName); + LOGGER.info("Force to close the storage group processor {}.", processorName); while (!closeOneProcessor(processorName)) { try { - LOGGER.info("Can't force to close the filenode processor {}, wait 100ms to retry", + LOGGER.info("Can't force to close the storage group processor {}, wait 100ms to retry", processorName); TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { // ignore the interrupted exception - LOGGER.error("Unexpected interruption {}", e); + LOGGER.error("Unexpected interruption {} when closing storage group {}",processorName, + e); Thread.currentThread().interrupt(); } } } finally { - fileNodeManagerStatus = FileNodeManagerStatus.NONE; + storageGroupManagerStatus = StorageGroupManagerStatus.NONE; } } /** - * try to close the filenode processor. + * try to close the storage group processor. */ - private void close(String processorName) throws StorageGroupManagerException { + private void closeStorageGroupPrivate(String processorName) throws StorageGroupManagerException { if (!processorMap.containsKey(processorName)) { - LOGGER.warn("The processorMap doesn't contain the filenode processor {}.", processorName); + LOGGER.warn("The processorMap doesn't contain the storage group {}.", processorName); return; } - LOGGER.info("Try to close the filenode processor {}.", processorName); - FileNodeProcessor processor = processorMap.get(processorName); + LOGGER.info("Try to close the storage group processor {}.", processorName); + StorageGroupProcessor processor = processorMap.get(processorName); if (!processor.tryWriteLock()) { - LOGGER.warn("Can't get the write lock of the filenode processor {}.", processorName); + LOGGER.warn("Can't get the write lock of the storage group processor {}.", processorName); return; } try { if (processor.canBeClosed()) { try { - LOGGER.info("Close the filenode processor {}.", processorName); + LOGGER.info("Close the storage group processor {}.", processorName); processor.close(); - } catch (ProcessorException e) { - LOGGER.error("Close the filenode processor {} error.", processorName, e); - throw new StorageGroupManagerException(e); + } catch (TsFileProcessorException e) { + throw new StorageGroupManagerException(String + .format("Close the storage group processor %s error.", processorName), e); } } else { - LOGGER.warn("The filenode processor {} can't be closed.", processorName); + LOGGER.warn("The storage group processor {} can't be closed.", processorName); } } finally { processor.writeUnlock(); @@ -812,27 +825,27 @@ public class StorageGroupManager implements IStatistic, IService { } /** - * delete all filenode. + * delete all storage groups. */ public synchronized boolean deleteAll() throws StorageGroupManagerException { - LOGGER.info("Start deleting all filenode"); - if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) { - LOGGER.info("Failed to delete all filenode processor because of merge operation"); + LOGGER.info("Start deleting all storage group"); + if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) { + LOGGER.info("Failed to delete all storage group processor because of merge operation"); return false; } - fileNodeManagerStatus = FileNodeManagerStatus.CLOSE; + storageGroupManagerStatus = StorageGroupManagerStatus.CLOSE; try { - Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator = processorMap.entrySet() + Iterator<Map.Entry<String, StorageGroupProcessor>> processorIterator = processorMap.entrySet() .iterator(); while (processorIterator.hasNext()) { - Map.Entry<String, FileNodeProcessor> processorEntry = processorIterator.next(); + Map.Entry<String, StorageGroupProcessor> processorEntry = processorIterator.next(); deleteProcessor(processorEntry.getKey(), processorIterator); } return processorMap.isEmpty(); } finally { - LOGGER.info("Deleting all FileNodeProcessors ends"); - fileNodeManagerStatus = FileNodeManagerStatus.NONE; + LOGGER.info("Deleting all StorageGroupProcessors ends"); + storageGroupManagerStatus = StorageGroupManagerStatus.NONE; } } @@ -840,19 +853,19 @@ public class StorageGroupManager implements IStatistic, IService { * Try to close All. */ public void closeAll() throws StorageGroupManagerException { - LOGGER.info("Start closing all filenode processor"); - if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) { - LOGGER.info("Failed to close all filenode processor because of merge operation"); + LOGGER.info("Start closing all storage group processor"); + if (storageGroupManagerStatus != StorageGroupManagerStatus.NONE) { + LOGGER.info("Failed to close all storage group processor because of merge operation"); return; } - fileNodeManagerStatus = FileNodeManagerStatus.CLOSE; + storageGroupManagerStatus = StorageGroupManagerStatus.CLOSE; try { - for (Map.Entry<String, FileNodeProcessor> processorEntry : processorMap.entrySet()) { - close(processorEntry.getKey()); + for (Map.Entry<String, StorageGroupProcessor> processorEntry : processorMap.entrySet()) { + closeStorageGroupPrivate(processorEntry.getKey()); } } finally { - LOGGER.info("Close all FileNodeProcessors ends"); - fileNodeManagerStatus = FileNodeManagerStatus.NONE; + LOGGER.info("Close all StorageGroupProcessors ends"); + storageGroupManagerStatus = StorageGroupManagerStatus.NONE; } } @@ -860,8 +873,6 @@ public class StorageGroupManager implements IStatistic, IService { * force flush to control memory usage. */ public void forceFlush(BasicMemController.UsageLevel level) { - // you may add some delicate process like below - // or you could provide multiple methods for different urgency switch (level) { // only select the most urgent (most active or biggest in size) // processors to flush @@ -870,7 +881,7 @@ public class StorageGroupManager implements IStatistic, IService { try { flushTop(0.1f); } catch (IOException e) { - LOGGER.error("force flush memory data error: {}", e); + LOGGER.error("force flush memory data error in waring level: ", e); } break; // force all processors to flush @@ -878,34 +889,31 @@ public class StorageGroupManager implements IStatistic, IService { try { flushAll(); } catch (IOException e) { - LOGGER.error("force flush memory data error: {}", e); + LOGGER.error("force flush memory data error in dangerous level: ", e); } break; - // if the flush thread pool is not full ( or half full), start a new - // flush task case SAFE: - if (FlushManager.getInstance().getActiveCnt() < 0.5 * FlushManager.getInstance() - .getThreadCnt()) { - try { - flushTop(0.01f); - } catch (IOException e) { - LOGGER.error("force flush memory data error: ", e); - } - } break; default: + throw new UnsupportedOperationException("Unreachable"); } } private void flushAll() throws IOException { - for (FileNodeProcessor processor : processorMap.values()) { + for (StorageGroupProcessor processor : processorMap.values()) { if (!processor.tryLock(true)) { continue; } try { - boolean isMerge = processor.flush().isHasOverflowFlushTask(); - if (isMerge) { - processor.submitToMerge(); + try { + processor.flush().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error("Un expected interruption when flushing storage group {}", + processor.getProcessorName(), e); + } catch (ExecutionException e) { + LOGGER.error("Error occurred when flushing storage group {}", + processor.getProcessorName(), e); } } finally { processor.unlock(true); @@ -914,7 +922,7 @@ public class StorageGroupManager implements IStatistic, IService { } private void flushTop(float percentage) throws IOException { - List<FileNodeProcessor> tempProcessors = new ArrayList<>(processorMap.values()); + List<StorageGroupProcessor> tempProcessors = new ArrayList<>(processorMap.values()); // sort the tempProcessors as descending order tempProcessors.sort((o1, o2) -> (int) (o2.memoryUsage() - o1.memoryUsage())); int flushNum = @@ -922,16 +930,21 @@ public class StorageGroupManager implements IStatistic, IService { ? (int) (tempProcessors.size() * percentage) : 1; for (int i = 0; i < flushNum && i < tempProcessors.size(); i++) { - FileNodeProcessor processor = tempProcessors.get(i); - // 64M + StorageGroupProcessor processor = tempProcessors.get(i); if (processor.memoryUsage() <= TSFileConfig.groupSizeInByte / 2) { continue; } processor.writeLock(); try { - boolean isMerge = processor.flush().isHasOverflowFlushTask(); - if (isMerge) { - processor.submitToMerge(); + try { + processor.flush().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error("Unexpected interruption when flushing storage group {}", + processor.getProcessorName(), e); + } catch (ExecutionException e) { + LOGGER.error("Error occurred when flushing storage group {}", + processor.getProcessorName(), e); } } finally { processor.writeUnlock(); @@ -958,46 +971,17 @@ public class StorageGroupManager implements IStatistic, IService { return ServiceType.FILE_NODE_SERVICE; } - /** - * get restore file path. - */ - public String getRestoreFilePath(String processorName) { - FileNodeProcessor fileNodeProcessor = processorMap.get(processorName); - if (fileNodeProcessor != null) { - return fileNodeProcessor.getFileNodeRestoreFilePath(); - } else { - return null; - } - } - - /** - * recover filenode. - */ - public void recoverFileNode(String filenodeName) - throws StorageGroupManagerException { - FileNodeProcessor fileNodeProcessor = getProcessor(filenodeName, true); - LOGGER.info("Recover the filenode processor, the filenode is {}, the status is {}", - filenodeName, fileNodeProcessor.getFileNodeProcessorStatus()); - try { - fileNodeProcessor.fileNodeRecovery(); - } catch (FileNodeProcessorException e) { - throw new StorageGroupManagerException(e); - } finally { - fileNodeProcessor.writeUnlock(); - } - } - private enum FileNodeManagerStatus { + private enum StorageGroupManagerStatus { NONE, MERGE, CLOSE } - private static class FileNodeManagerHolder { + private static class StorageGroupManagerHolder { - private FileNodeManagerHolder() { + private StorageGroupManagerHolder() { } - private static final StorageGroupManager INSTANCE = new StorageGroupManager( - TsFileDBConf.getFileNodeDir()); + private static final StorageGroupManager INSTANCE = new StorageGroupManager(); } } \ No newline at end of file diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java index c658b8e..238596c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java @@ -49,7 +49,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.Directories; import org.apache.iotdb.db.engine.Processor; import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants; -import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus; import org.apache.iotdb.db.engine.filenode.TsFileResource; import org.apache.iotdb.db.engine.merge.MergeTask; import org.apache.iotdb.db.engine.modification.Deletion; @@ -68,6 +67,7 @@ import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.monitor.IStatistic; import org.apache.iotdb.db.monitor.MonitorConstants; +import org.apache.iotdb.db.monitor.MonitorConstants.StorageGroupProcessorStatConstants; import org.apache.iotdb.db.monitor.StatMonitor; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.crud.UpdatePlan; @@ -286,8 +286,8 @@ public class StorageGroupProcessor extends Processor implements IStatistic { @Override public void registerStatMetadata() { Map<String, String> hashMap = new HashMap<>(); - for (MonitorConstants.FileNodeProcessorStatConstants statConstant : - MonitorConstants.FileNodeProcessorStatConstants.values()) { + for (StorageGroupProcessorStatConstants statConstant : + StorageGroupProcessorStatConstants.values()) { hashMap .put(statStorageGroupName + MonitorConstants.MONITOR_PATH_SEPARATOR + statConstant.name(), MonitorConstants.DATA_TYPE_INT64); @@ -298,8 +298,8 @@ public class StorageGroupProcessor extends Processor implements IStatistic { @Override public List<String> getAllPathForStatistic() { List<String> list = new ArrayList<>(); - for (MonitorConstants.FileNodeProcessorStatConstants statConstant : - MonitorConstants.FileNodeProcessorStatConstants.values()) { + for (StorageGroupProcessorStatConstants statConstant : + StorageGroupProcessorStatConstants.values()) { list.add( statStorageGroupName + MonitorConstants.MONITOR_PATH_SEPARATOR + statConstant.name()); } @@ -490,7 +490,7 @@ public class StorageGroupProcessor extends Processor implements IStatistic { * @param appendFile the appended tsfile information */ public List<String> getOverlapFiles(TsFileResource appendFile, String uuid) - throws FileNodeProcessorException { + throws TsFileProcessorException { return tsFileProcessor.getOverlapFiles(appendFile, uuid); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java index d822edb..c729285 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java @@ -923,14 +923,14 @@ public class TsFileProcessor extends Processor { * @param appendFile the appended tsfile information */ public List<String> getOverlapFiles(TsFileResource appendFile, String uuid) - throws FileNodeProcessorException { + throws TsFileProcessorException { List<String> overlapFiles = new ArrayList<>(); try { for (TsFileResource tsFileResource : tsFileResources) { getOverlapFile(appendFile, tsFileResource, uuid, overlapFiles); } } catch (IOException e) { - throw new FileNodeProcessorException(String.format("Failed to get tsfiles " + throw new TsFileProcessorException(String.format("Failed to get tsfiles " + "which overlap with the appendFile: %s.", appendFile.getFilePath()), e); } return overlapFiles; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java index a9451a3..ec97a0d 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java @@ -60,7 +60,7 @@ public class MonitorConstants { HashMap<String, AtomicLong> hashMap = new HashMap<>(); switch (constantsType) { case FILENODE_PROCESSOR_CONST: - for (FileNodeProcessorStatConstants statConstant : FileNodeProcessorStatConstants + for (StorageGroupProcessorStatConstants statConstant : StorageGroupProcessorStatConstants .values()) { hashMap.put(statConstant.name(), new AtomicLong(0)); } @@ -84,7 +84,7 @@ public class MonitorConstants { TOTAL_POINTS, TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL, TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL } - public enum FileNodeProcessorStatConstants { + public enum StorageGroupProcessorStatConstants { TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL, TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java index 4283e24..b04e506 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java @@ -37,7 +37,7 @@ import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.monitor.MonitorConstants.StorageGroupManagerStatConstants; -import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeProcessorStatConstants; +import org.apache.iotdb.db.monitor.MonitorConstants.StorageGroupProcessorStatConstants; import org.apache.iotdb.db.monitor.collector.FileSize; import org.apache.iotdb.db.service.IService; import org.apache.iotdb.db.service.ServiceType; @@ -97,7 +97,7 @@ public class StatMonitor implements IService { for (StorageGroupManagerStatConstants constants : StorageGroupManagerStatConstants.values()) { temporaryStatList.add(constants.name()); } - for (FileNodeProcessorStatConstants constants : FileNodeProcessorStatConstants.values()) { + for (StorageGroupProcessorStatConstants constants : StorageGroupProcessorStatConstants.values()) { temporaryStatList.add(constants.name()); } }
