This is an automated email from the ASF dual-hosted git repository. liurui pushed a commit to branch file_size_monitor in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit d32bf17f7e155523704f9ad38447fe0acb84e90d Author: liuruiyiyang <[email protected]> AuthorDate: Fri Mar 8 01:24:13 2019 +0800 add monitor statistics for file size --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../iotdb/db/engine/filenode/FileNodeManager.java | 88 ++++++------ .../db/engine/filenode/FileNodeProcessor.java | 20 +-- .../org/apache/iotdb/db/monitor/IStatistic.java | 5 +- .../apache/iotdb/db/monitor/MonitorConstants.java | 68 ++++++--- .../org/apache/iotdb/db/monitor/StatMonitor.java | 158 +++++++++++++-------- .../iotdb/db/monitor/collector/FileSize.java | 155 ++++++++++++++++++++ .../org/apache/iotdb/db/monitor/MonitorTest.java | 66 ++++++--- .../iotdb/db/monitor/collector/FileSizeTest.java | 94 ++++++++++++ .../tsfile/write/record/datapoint/DataPoint.java | 2 +- 10 files changed, 500 insertions(+), 158 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 96e2969..1500e7d 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -90,7 +90,7 @@ public class IoTDBConfig { /** * Data directory of bufferWrite data. - * It can be setted as bufferWriteDirs = {"settled1", "settled2", "settled3"}; + * It can be settled as bufferWriteDirs = {"settled1", "settled2", "settled3"}; */ public String[] bufferWriteDirs = {"settled"}; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java index f4e4864..360b956 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java @@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; @@ -109,8 +108,8 @@ public class FileNodeManager implements IStatistic, IService { if (TsFileDBConf.enableStatMonitor) { StatMonitor statMonitor = StatMonitor.getInstance(); - registStatMetadata(); - statMonitor.registStatistics(MonitorConstants.STAT_STORAGE_DELTA_NAME, this); + registerStatMetadata(); + statMonitor.registerStatistics(MonitorConstants.STAT_STORAGE_DELTA_NAME, this); } } @@ -140,7 +139,7 @@ public class FileNodeManager implements IStatistic, IService { List<String> list = new ArrayList<>(); for (MonitorConstants.FileNodeManagerStatConstants statConstant : MonitorConstants.FileNodeManagerStatConstants.values()) { - list.add(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPERATOR + list.add(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPARATOR + statConstant.name()); } return list; @@ -158,18 +157,18 @@ public class FileNodeManager implements IStatistic, IService { } /** - * Init Stat MetaDta + * Init Stat MetaDta. */ @Override - public void registStatMetadata() { + public void registerStatMetadata() { Map<String, String> hashMap = new HashMap<>(); for (MonitorConstants.FileNodeManagerStatConstants statConstant : MonitorConstants.FileNodeManagerStatConstants.values()) { hashMap - .put(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPERATOR + - statConstant.name(), MonitorConstants.DATA_TYPE); + .put(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPARATOR + + statConstant.name(), MonitorConstants.DATA_TYPE_INT64); } - StatMonitor.getInstance().registStatStorageGroup(hashMap); + StatMonitor.getInstance().registerStatStorageGroup(hashMap); } /** @@ -512,9 +511,9 @@ public class FileNodeManager implements IStatistic, IService { } try { overflowProcessor.getLogNode().write(new DeletePlan(timestamp, - new Path(deviceId + "." + measurementId))); + new Path(deviceId + "." + measurementId))); bufferWriteProcessor.getLogNode().write(new DeletePlan(timestamp, - new Path(deviceId + "." + measurementId))); + new Path(deviceId + "." + measurementId))); } catch (IOException e) { throw new FileNodeManagerException(e); } @@ -535,6 +534,39 @@ public class FileNodeManager implements IStatistic, IService { } } + private void delete(String processorName, + Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator) + throws FileNodeManagerException { + if (!processorMap.containsKey(processorName)) { + LOGGER.warn("The processorMap doesn't contain the filenode processor {}.", processorName); + return; + } + LOGGER.info("Try to delete the filenode processor {}.", processorName); + FileNodeProcessor processor = processorMap.get(processorName); + if (!processor.tryWriteLock()) { + LOGGER.warn("Can't get the write lock of the filenode processor {}.", processorName); + return; + } + + try { + if (!processor.canBeClosed()) { + LOGGER.warn("The filenode processor {} can't be deleted.", processorName); + return; + } + + try { + LOGGER.info("Delete the filenode processor {}.", processorName); + processor.delete(); + processorIterator.remove(); + } catch (ProcessorException e) { + LOGGER.error("Delete the filenode processor {} by iterator error.", processorName, e); + throw new FileNodeManagerException(e); + } + } finally { + processor.writeUnlock(); + } + } + /** * Similar to delete(), but only deletes data in BufferWrite. Only used by WAL recovery. */ @@ -571,40 +603,6 @@ public class FileNodeManager implements IStatistic, IService { fileNodeProcessor.setOverflowed(true); } - - private void delete(String processorName, - Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator) - throws FileNodeManagerException { - if (!processorMap.containsKey(processorName)) { - LOGGER.warn("The processorMap doesn't contain the filenode processor {}.", processorName); - return; - } - LOGGER.info("Try to delete the filenode processor {}.", processorName); - FileNodeProcessor processor = processorMap.get(processorName); - if (!processor.tryWriteLock()) { - LOGGER.warn("Can't get the write lock of the filenode processor {}.", processorName); - return; - } - - try { - if (!processor.canBeClosed()) { - LOGGER.warn("The filenode processor {} can't be deleted.", processorName); - return; - } - - try { - LOGGER.info("Delete the filenode processor {}.", processorName); - processor.delete(); - processorIterator.remove(); - } catch (ProcessorException e) { - LOGGER.error("Delete the filenode processor {} by iterator error.", processorName, e); - throw new FileNodeManagerException(e); - } - } finally { - processor.writeUnlock(); - } - } - /** * begin query. */ diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java index a7c4e77..18f8354 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java @@ -234,8 +234,8 @@ public class FileNodeProcessor extends Processor implements IStatistic { statParamsHashMap.put(statConstant.name(), new AtomicLong(0)); } statStorageDeltaName = - MonitorConstants.STAT_STORAGE_GROUP_PREFIX + MonitorConstants.MONITOR_PATH_SEPERATOR - + MonitorConstants.FILE_NODE_PATH + MonitorConstants.MONITOR_PATH_SEPERATOR + MonitorConstants.STAT_STORAGE_GROUP_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR + + MonitorConstants.FILE_NODE_PATH + MonitorConstants.MONITOR_PATH_SEPARATOR + processorName.replaceAll("\\.", "_"); this.parameters = new HashMap<>(); @@ -292,8 +292,8 @@ public class FileNodeProcessor extends Processor implements IStatistic { // RegistStatService if (TsFileDBConf.enableStatMonitor) { StatMonitor statMonitor = StatMonitor.getInstance(); - registStatMetadata(); - statMonitor.registStatistics(statStorageDeltaName, this); + registerStatMetadata(); + statMonitor.registerStatistics(statStorageDeltaName, this); } try { versionController = new SimpleFileVersionController(fileNodeDirPath); @@ -308,15 +308,15 @@ public class FileNodeProcessor extends Processor implements IStatistic { } @Override - public void registStatMetadata() { + public void registerStatMetadata() { Map<String, String> hashMap = new HashMap<>(); for (MonitorConstants.FileNodeProcessorStatConstants statConstant : MonitorConstants.FileNodeProcessorStatConstants.values()) { hashMap - .put(statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR + statConstant.name(), - MonitorConstants.DATA_TYPE); + .put(statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPARATOR + statConstant.name(), + MonitorConstants.DATA_TYPE_INT64); } - StatMonitor.getInstance().registStatStorageGroup(hashMap); + StatMonitor.getInstance().registerStatStorageGroup(hashMap); } @Override @@ -325,7 +325,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { for (MonitorConstants.FileNodeProcessorStatConstants statConstant : MonitorConstants.FileNodeProcessorStatConstants.values()) { list.add( - statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR + statConstant.name()); + statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPARATOR + statConstant.name()); } return list; } @@ -1788,7 +1788,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { if (TsFileDBConf.enableStatMonitor) { // remove the monitor LOGGER.info("Deregister the filenode processor: {} from monitor.", getProcessorName()); - StatMonitor.getInstance().deregistStatistics(statStorageDeltaName); + StatMonitor.getInstance().deregisterStatistics(statStorageDeltaName); } closeBufferWrite(); closeOverflow(); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/IStatistic.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/IStatistic.java index aeb0953..36c83da 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/IStatistic.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/IStatistic.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.monitor; import java.util.List; @@ -34,9 +35,9 @@ public interface IStatistic { Map<String, TSRecord> getAllStatisticsValue(); /** - * registStatMetadata registers statistics info to the manager. + * registerStatMetadata registers statistics info to the manager. */ - void registStatMetadata(); + void registerStatMetadata(); /** * Get all module's statistics parameters as a time-series seriesPath. diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java index 21042ee..7f7c5bd 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/MonitorConstants.java @@ -16,35 +16,45 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.monitor; +import java.io.File; import java.util.HashMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.monitor.collector.FileSize; +import org.apache.iotdb.db.service.Monitor; public class MonitorConstants { - - public static final String DATA_TYPE = "INT64"; - public static final String FILENODE_PROCESSOR_CONST = "FILENODE_PROCESSOR_CONST"; - public static final String FILENODE_MANAGER_CONST = "FILENODE_MANAGER_CONST"; - public static final String MONITOR_PATH_SEPERATOR = "."; + private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + public static final String DATA_TYPE_INT64 = "INT64"; public static final String STAT_STORAGE_GROUP_PREFIX = "root.stats"; - + static final String FILENODE_PROCESSOR_CONST = "FILENODE_PROCESSOR_CONST"; + private static final String FILENODE_MANAGER_CONST = "FILENODE_MANAGER_CONST"; + static final String FILE_SIZE_CONST = "FILE_SIZE_CONST"; + public static final String MONITOR_PATH_SEPARATOR = "."; + // statistic for file size statistic module + private static final String FILE_SIZE = "file_size"; + public static final String FILE_SIZE_STORAGE_GROUP_NAME = STAT_STORAGE_GROUP_PREFIX + + MONITOR_PATH_SEPARATOR + FILE_SIZE; // statistic for write module - public static final String FILE_NODE_MANAGER_PATH = "write.global"; + static final String FILE_NODE_MANAGER_PATH = "write.global"; public static final String FILE_NODE_PATH = "write"; /** * Stat information. */ public static final String STAT_STORAGE_DELTA_NAME = STAT_STORAGE_GROUP_PREFIX - + MONITOR_PATH_SEPERATOR + FILE_NODE_MANAGER_PATH; + + MONITOR_PATH_SEPARATOR + FILE_NODE_MANAGER_PATH; /** - * function for initing values. + * function for initializing stats values. * * @param constantsType produce initialization values for Statistics Params * @return HashMap contains all the Statistics Params */ - public static HashMap<String, AtomicLong> initValues(String constantsType) { + static HashMap<String, AtomicLong> initValues(String constantsType) { HashMap<String, AtomicLong> hashMap = new HashMap<>(); switch (constantsType) { case FILENODE_PROCESSOR_CONST: @@ -54,23 +64,49 @@ public class MonitorConstants { } break; case FILENODE_MANAGER_CONST: - for (FileNodeManagerStatConstants statConstant : FileNodeManagerStatConstants.values()) { - hashMap.put(statConstant.name(), new AtomicLong(0)); + hashMap = (HashMap<String, AtomicLong>) FileSize.getInstance().getStatParamsHashMap(); + break; + case FILE_SIZE_CONST: + for (FileSizeConstants kinds : FileSizeConstants.values()) { + hashMap.put(kinds.name(), new AtomicLong(0)); } break; default: - // TODO: throws some errors + break; } return hashMap; } public enum FileNodeManagerStatConstants { - TOTAL_POINTS, TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL, TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL, - + TOTAL_POINTS, TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL, TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL } public enum FileNodeProcessorStatConstants { - TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL, TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL, + TOTAL_REQ_SUCCESS, TOTAL_REQ_FAIL, TOTAL_POINTS_SUCCESS, TOTAL_POINTS_FAIL + } + + public enum OsStatConstants { + NETWORK_REC, NETWORK_SEND, CPU_USAGE, MEM_USAGE, IOTDB_MEM_SIZE, DISK_USAGE, DISK_READ_SPEED, + DISK_WRITE_SPEED, DISK_TPS + } + + public enum FileSizeConstants { + DATA(Monitor.INSTANCE.getBaseDirectory()), + OVERFLOW(new File(config.overflowDataDir).getAbsolutePath()), + SETTLED(Monitor.INSTANCE.getBaseDirectory() + File.separatorChar + "settled"), + WAL(new File(config.walFolder).getAbsolutePath()), + INFO(new File(config.fileNodeDir).getAbsolutePath()), + SCHEMA(new File(config.metadataDir).getAbsolutePath()); + + public String getPath() { + return path; + } + + private String path; + + FileSizeConstants(String path) { + this.path = path; + } } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java index 56d9891..3e34d92 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.monitor; import java.io.IOException; @@ -25,7 +26,6 @@ import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.conf.IoTDBConfig; @@ -36,10 +36,12 @@ import org.apache.iotdb.db.exception.MetadataArgsErrorException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants; +import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeProcessorStatConstants; +import org.apache.iotdb.db.monitor.collector.FileSize; import org.apache.iotdb.db.service.IService; import org.apache.iotdb.db.service.ServiceType; import org.apache.iotdb.tsfile.write.record.TSRecord; -import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,11 +53,12 @@ public class StatMonitor implements IService { private final int statMonitorDetectFreqSec; private final int statMonitorRetainIntervalSec; private long runningTimeMillis = System.currentTimeMillis(); + private static final ArrayList<String> temporaryStatList = new ArrayList<>(); /** * key: is the statistics store seriesPath Value: is an interface that implements statistics * function. */ - private HashMap<String, IStatistic> statisticMap; + private final HashMap<String, IStatistic> statisticMap; private ScheduledExecutorService service; /** @@ -67,6 +70,7 @@ public class StatMonitor implements IService { private AtomicLong numInsertError = new AtomicLong(0); private StatMonitor() { + initTemporaryStatList(); MManager mmanager = MManager.getInstance(); statisticMap = new HashMap<>(); IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -85,6 +89,15 @@ public class StatMonitor implements IService { } } + private void initTemporaryStatList() { + for (FileNodeManagerStatConstants constants : FileNodeManagerStatConstants.values()) { + temporaryStatList.add(constants.name()); + } + for (FileNodeProcessorStatConstants constants : FileNodeProcessorStatConstants.values()) { + temporaryStatList.add(constants.name()); + } + } + public static StatMonitor getInstance() { return StatMonitorHolder.INSTANCE; } @@ -94,20 +107,17 @@ public class StatMonitor implements IService { * * @param hashMap key is statParams name, values is AtomicLong type * @param statGroupDeltaName is the deviceId seriesPath of this module - * @param curTime TODO need to be fixed because it may contain overflow + * @param curTime current time stamp * @return TSRecord contains the DataPoints of a statGroupDeltaName */ public static TSRecord convertToTSRecord(Map<String, AtomicLong> hashMap, - String statGroupDeltaName, long curTime) { + String statGroupDeltaName, long curTime) { TSRecord tsRecord = new TSRecord(curTime, statGroupDeltaName); - tsRecord.dataPointList = new ArrayList<DataPoint>() { - { - for (Map.Entry<String, AtomicLong> entry : hashMap.entrySet()) { - AtomicLong value = entry.getValue(); - add(new LongDataPoint(entry.getKey(), value.get())); - } - } - }; + tsRecord.dataPointList = new ArrayList<>(); + for (Map.Entry<String, AtomicLong> entry : hashMap.entrySet()) { + AtomicLong value = entry.getValue(); + tsRecord.dataPointList.add(new LongDataPoint(entry.getKey(), value.get())); + } return tsRecord; } @@ -123,7 +133,7 @@ public class StatMonitor implements IService { return numInsertError.get(); } - public void registStatStorageGroup() { + void registerStatStorageGroup() { MManager mManager = MManager.getInstance(); String prefix = MonitorConstants.STAT_STORAGE_GROUP_PREFIX; try { @@ -135,12 +145,16 @@ public class StatMonitor implements IService { } } - public synchronized void registStatStorageGroup(Map<String, String> hashMap) { + /** + * register monitor statistics time series metadata into MManager. + * @param hashMap series path and data type pair, for example: [root.stat.file.size.DATA, INT64] + */ + public synchronized void registerStatStorageGroup(Map<String, String> hashMap) { MManager mManager = MManager.getInstance(); try { for (Map.Entry<String, String> entry : hashMap.entrySet()) { - if (entry.getKey() == null) { - LOGGER.error("Registering metadata but {} is null", entry.getKey()); + if (entry.getValue() == null) { + LOGGER.error("Registering metadata but data type of {} is null", entry.getKey()); } if (!mManager.pathExist(entry.getKey())) { @@ -156,15 +170,14 @@ public class StatMonitor implements IService { // // restore the FildeNode Manager TOTAL_POINTS statistics info } - public void activate() { - + void activate() { service = IoTDBThreadPoolFactory.newScheduledThreadPool(1, - ThreadName.STAT_MONITOR.getName()); + ThreadName.STAT_MONITOR.getName()); service.scheduleAtFixedRate( new StatBackLoop(), 1, backLoopPeriod, TimeUnit.SECONDS); } - public void clearIStatisticMap() { + void clearIStatisticMap() { statisticMap.clear(); } @@ -172,7 +185,12 @@ public class StatMonitor implements IService { return numBackLoop.get(); } - public void registStatistics(String path, IStatistic iStatistic) { + /** + * register class which implemented IStatistic interface into statisticMap + * @param path the stat series prefix path, like root.stat.file.size + * @param iStatistic instance of class which implemented IStatistic interface + */ + public void registerStatistics(String path, IStatistic iStatistic) { synchronized (statisticMap) { LOGGER.debug("Register {} to StatMonitor for statistics service", path); this.statisticMap.put(path, iStatistic); @@ -180,9 +198,9 @@ public class StatMonitor implements IService { } /** - * deregist statistics. + * deregister statistics. */ - public void deregistStatistics(String path) { + public void deregisterStatistics(String path) { LOGGER.debug("Deregister {} in StatMonitor for stopping statistics service", path); synchronized (statisticMap) { if (statisticMap.containsKey(path)) { @@ -192,7 +210,7 @@ public class StatMonitor implements IService { } /** - * TODO: need to complete the query key concept. + * This function is not used and need to complete the query key concept. * * @return TSRecord, query statistics params */ @@ -201,8 +219,9 @@ public class StatMonitor implements IService { // or FileNodeManager seriesPath:FileNodeManager String queryPath; if (key.contains("\\.")) { - queryPath = MonitorConstants.STAT_STORAGE_GROUP_PREFIX + MonitorConstants.MONITOR_PATH_SEPERATOR - + key.replaceAll("\\.", "_"); + queryPath = + MonitorConstants.STAT_STORAGE_GROUP_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR + + key.replaceAll("\\.", "_"); } else { queryPath = key; } @@ -212,8 +231,8 @@ public class StatMonitor implements IService { long currentTimeMillis = System.currentTimeMillis(); HashMap<String, TSRecord> hashMap = new HashMap<>(); TSRecord tsRecord = convertToTSRecord( - MonitorConstants.initValues(MonitorConstants.FILENODE_PROCESSOR_CONST), queryPath, - currentTimeMillis); + MonitorConstants.initValues(MonitorConstants.FILENODE_PROCESSOR_CONST), queryPath, + currentTimeMillis); hashMap.put(queryPath, tsRecord); return hashMap; } @@ -222,21 +241,31 @@ public class StatMonitor implements IService { /** * get all statistics. */ - public HashMap<String, TSRecord> gatherStatistics() { + public Map<String, TSRecord> gatherStatistics() { synchronized (statisticMap) { long currentTimeMillis = System.currentTimeMillis(); HashMap<String, TSRecord> tsRecordHashMap = new HashMap<>(); for (Map.Entry<String, IStatistic> entry : statisticMap.entrySet()) { if (entry.getValue() == null) { - tsRecordHashMap.put(entry.getKey(), + switch (entry.getKey()) { + case MonitorConstants.STAT_STORAGE_DELTA_NAME: + tsRecordHashMap.put(entry.getKey(), + convertToTSRecord( + MonitorConstants.initValues(MonitorConstants.FILENODE_PROCESSOR_CONST), + entry.getKey(), currentTimeMillis)); + break; + case MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME: + tsRecordHashMap.put(entry.getKey(), convertToTSRecord( - MonitorConstants.initValues(MonitorConstants.FILENODE_PROCESSOR_CONST), - entry.getKey(), currentTimeMillis)); + MonitorConstants.initValues(MonitorConstants.FILE_SIZE_CONST), + entry.getKey(), currentTimeMillis)); + break; + default: + } } else { tsRecordHashMap.putAll(entry.getValue().getAllStatisticsValue()); } } - LOGGER.debug("Values of tsRecordHashMap is : {}", tsRecordHashMap.toString()); for (TSRecord value : tsRecordHashMap.values()) { value.time = currentTimeMillis; } @@ -244,23 +273,6 @@ public class StatMonitor implements IService { } } - private void insert(HashMap<String, TSRecord> tsRecordHashMap) { - FileNodeManager fManager = FileNodeManager.getInstance(); - int count = 0; - int pointNum; - for (Map.Entry<String, TSRecord> entry : tsRecordHashMap.entrySet()) { - try { - fManager.insert(entry.getValue(), true); - numInsert.incrementAndGet(); - pointNum = entry.getValue().dataPointList.size(); - numPointsInsert.addAndGet(pointNum); - count += pointNum; - } catch (FileNodeManagerException e) { - numInsertError.incrementAndGet(); - LOGGER.error("Inserting stat points error.", e); - } - } - } /** * close statistic service. @@ -276,6 +288,8 @@ public class StatMonitor implements IService { service.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { LOGGER.error("StatMonitor timing service could not be shutdown.", e); + // Restore interrupted state... + Thread.currentThread().interrupt(); } } @@ -287,8 +301,8 @@ public class StatMonitor implements IService { } } catch (Exception e) { String errorMessage = String - .format("Failed to start %s because of %s", this.getID().getName(), - e.getMessage()); + .format("Failed to start %s because of %s", this.getID().getName(), + e.getMessage()); throw new StartupException(errorMessage); } } @@ -306,40 +320,64 @@ public class StatMonitor implements IService { } private static class StatMonitorHolder { - private StatMonitorHolder(){ + + private StatMonitorHolder() { //allowed do nothing } + private static final StatMonitor INSTANCE = new StatMonitor(); } class StatBackLoop implements Runnable { + @Override public void run() { try { long currentTimeMillis = System.currentTimeMillis(); long seconds = (currentTimeMillis - runningTimeMillis) / 1000; - if (seconds - statMonitorDetectFreqSec >= 0) { + if (seconds >= statMonitorDetectFreqSec) { runningTimeMillis = currentTimeMillis; // delete time-series data FileNodeManager fManager = FileNodeManager.getInstance(); try { for (Map.Entry<String, IStatistic> entry : statisticMap.entrySet()) { for (String statParamName : entry.getValue().getStatParamsHashMap().keySet()) { - fManager.delete(entry.getKey(), statParamName, - currentTimeMillis - statMonitorRetainIntervalSec * 1000); + if (temporaryStatList.contains(statParamName)) { + fManager.delete(entry.getKey(), statParamName, + currentTimeMillis - statMonitorRetainIntervalSec * 1000); + } } } } catch (FileNodeManagerException e) { - LOGGER.error("Error occurred when deleting statistics information periodically, because", - e); + LOGGER + .error("Error occurred when deleting statistics information periodically, because", + e); } } - HashMap<String, TSRecord> tsRecordHashMap = gatherStatistics(); + Map<String, TSRecord> tsRecordHashMap = gatherStatistics(); + FileSize fileSize = FileSize.getInstance(); + tsRecordHashMap.putAll(fileSize.getAllStatisticsValue()); insert(tsRecordHashMap); numBackLoop.incrementAndGet(); } catch (Exception e) { LOGGER.error("Error occurred in Stat Monitor thread", e); } } + + public void insert(Map<String, TSRecord> tsRecordHashMap) { + FileNodeManager fManager = FileNodeManager.getInstance(); + int pointNum; + for (Map.Entry<String, TSRecord> entry : tsRecordHashMap.entrySet()) { + try { + fManager.insert(entry.getValue(), true); + numInsert.incrementAndGet(); + pointNum = entry.getValue().dataPointList.size(); + numPointsInsert.addAndGet(pointNum); + } catch (FileNodeManagerException e) { + numInsertError.incrementAndGet(); + LOGGER.error("Inserting stat points error.", e); + } + } + } } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java new file mode 100644 index 0000000..f3f1c13 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/collector/FileSize.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.monitor.collector; + +import java.io.File; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.io.FileUtils; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.monitor.IStatistic; +import org.apache.iotdb.db.monitor.MonitorConstants; +import org.apache.iotdb.db.monitor.MonitorConstants.FileSizeConstants; +import org.apache.iotdb.db.monitor.StatMonitor; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is to collect some file size statistics. + */ +public class FileSize implements IStatistic { + + private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private static final Logger LOGGER = LoggerFactory.getLogger(FileSize.class); + private static final long ABNORMAL_VALUE = -1L; + private static final long INIT_VALUE_IF_FILE_NOT_EXIST = 0L; + + @Override + public Map<String, TSRecord> getAllStatisticsValue() { + long curTime = System.currentTimeMillis(); + TSRecord tsRecord = StatMonitor + .convertToTSRecord(getStatParamsHashMap(), MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME, + curTime); + HashMap<String, TSRecord> ret = new HashMap<>(); + ret.put(MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME, tsRecord); + return ret; + } + + @Override + public void registerStatMetadata() { + Map<String, String> hashMap = new HashMap<>(); + for (FileSizeConstants kind : MonitorConstants.FileSizeConstants.values()) { + hashMap + .put(MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME + + MonitorConstants.MONITOR_PATH_SEPARATOR + + kind.name(), MonitorConstants.DATA_TYPE_INT64); + } + StatMonitor.getInstance().registerStatStorageGroup(hashMap); + } + + @Override + public List<String> getAllPathForStatistic() { + List<String> list = new ArrayList<>(); + for (FileSizeConstants kind : MonitorConstants.FileSizeConstants.values()) { + list.add( + MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME + MonitorConstants.MONITOR_PATH_SEPARATOR + + kind.name()); + } + return list; + } + + @Override + public Map<String, AtomicLong> getStatParamsHashMap() { + Map<FileSizeConstants, Long> fileSizeMap = getFileSizesInByte(); + Map<String, AtomicLong> statParamsMap = new HashMap<>(); + for (FileSizeConstants kind : MonitorConstants.FileSizeConstants.values()) { + statParamsMap.put(kind.name(), new AtomicLong(fileSizeMap.get(kind))); + } + return statParamsMap; + } + + private static class FileSizeHolder { + + private static final FileSize INSTANCE = new FileSize(); + } + + private FileSize() { + if (config.enableStatMonitor) { + StatMonitor statMonitor = StatMonitor.getInstance(); + registerStatMetadata(); + statMonitor.registerStatistics(MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME, this); + } + } + + public static FileSize getInstance() { + return FileSizeHolder.INSTANCE; + } + + /** + * Return a map[FileSizeConstants, Long]. The key is the dir type and the value is the dir size in + * byte. + * + * @return a map[FileSizeConstants, Long] with the dir type and the dir size in byte + */ + public Map<FileSizeConstants, Long> getFileSizesInByte() { + EnumMap<FileSizeConstants, Long> fileSizes = new EnumMap<>(FileSizeConstants.class); + for (FileSizeConstants kinds : MonitorConstants.FileSizeConstants.values()) { + if (kinds.equals(MonitorConstants.FileSizeConstants.SETTLED)) { + //sum bufferWriteDirs size + long settledSize = INIT_VALUE_IF_FILE_NOT_EXIST; + for (String bufferWriteDir : config.bufferWriteDirs) { + File settledFile = new File(bufferWriteDir); + if (settledFile.exists()) { + try { + settledSize += FileUtils.sizeOfDirectory(settledFile); + } catch (Exception e) { + LOGGER.error("Meet error while trying to get {} size with dir {} .", kinds, + bufferWriteDir, e); + fileSizes.put(kinds, ABNORMAL_VALUE); + } + } + } + fileSizes.put(kinds, settledSize); + } else { + File file = new File(kinds.getPath()); + if (file.exists()) { + try { + fileSizes.put(kinds, FileUtils.sizeOfDirectory(file)); + } catch (Exception e) { + LOGGER + .error("Meet error while trying to get {} size with dir {} .", kinds, + kinds.getPath(), + e); + fileSizes.put(kinds, ABNORMAL_VALUE); + } + } else { + fileSizes.put(kinds, INIT_VALUE_IF_FILE_NOT_EXIST); + } + } + } + return fileSizes; + } +} diff --git a/iotdb/src/test/java/org/apache/iotdb/db/monitor/MonitorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/monitor/MonitorTest.java index b198fcb..0999ac4 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/monitor/MonitorTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/monitor/MonitorTest.java @@ -16,16 +16,23 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.monitor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.util.Map; import java.util.concurrent.atomic.AtomicLong; - import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.filenode.FileNodeManager; import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.monitor.MonitorConstants.FileSizeConstants; +import org.apache.iotdb.db.monitor.collector.FileSize; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; @@ -33,17 +40,9 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; - -/** - * @author Liliang - */ - public class MonitorTest { - private IoTDBConfig tsdbconfig = IoTDBDescriptor.getInstance().getConfig(); - - private FileNodeManager fManager = null; + private IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig(); private StatMonitor statMonitor; @Before @@ -52,35 +51,45 @@ public class MonitorTest { // modify stat parameter EnvironmentUtils.closeMemControl(); EnvironmentUtils.envSetUp(); - tsdbconfig.enableStatMonitor = true; - tsdbconfig.backLoopPeriodSec = 1; + ioTDBConfig.enableStatMonitor = true; + ioTDBConfig.backLoopPeriodSec = 1; } @After public void tearDown() throws Exception { - tsdbconfig.enableStatMonitor = false; + ioTDBConfig.enableStatMonitor = false; statMonitor.close(); EnvironmentUtils.cleanEnv(); } @Test public void testFileNodeManagerMonitorAndAddMetadata() { - fManager = FileNodeManager.getInstance(); + FileNodeManager fManager = FileNodeManager.getInstance(); + FileSize fileSize = FileSize.getInstance(); statMonitor = StatMonitor.getInstance(); - statMonitor.registStatStorageGroup(); + statMonitor.registerStatStorageGroup(); fManager.getStatParamsHashMap().forEach((key, value) -> value.set(0)); + fileSize.getStatParamsHashMap().forEach((key, value) -> value.set(0)); statMonitor.clearIStatisticMap(); - statMonitor.registStatistics(fManager.getClass().getSimpleName(), fManager); + statMonitor.registerStatistics(fManager.getClass().getSimpleName(), fManager); + statMonitor + .registerStatistics(MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME, FileSize.getInstance()); // add metadata MManager mManager = MManager.getInstance(); - fManager.registStatMetadata(); + fManager.registerStatMetadata(); + fileSize.registerStatMetadata(); Map<String, AtomicLong> statParamsHashMap = fManager.getStatParamsHashMap(); + Map<String, AtomicLong> fileSizeStatsHashMap = fileSize.getStatParamsHashMap(); for (String statParam : statParamsHashMap.keySet()) { - assertEquals(true, - mManager.pathExist( - MonitorConstants.STAT_STORAGE_GROUP_PREFIX + MonitorConstants.MONITOR_PATH_SEPERATOR - + MonitorConstants.FILE_NODE_MANAGER_PATH + MonitorConstants.MONITOR_PATH_SEPERATOR - + statParam)); + assertTrue(mManager.pathExist( + MonitorConstants.STAT_STORAGE_GROUP_PREFIX + MonitorConstants.MONITOR_PATH_SEPARATOR + + MonitorConstants.FILE_NODE_MANAGER_PATH + MonitorConstants.MONITOR_PATH_SEPARATOR + + statParam)); + } + for (String statParam : fileSizeStatsHashMap.keySet()) { + assertTrue(mManager.pathExist( + MonitorConstants.FILE_SIZE_STORAGE_GROUP_NAME + MonitorConstants.MONITOR_PATH_SEPARATOR + + statParam)); } statMonitor.activate(); // wait for time second @@ -95,16 +104,20 @@ public class MonitorTest { // Get stat data and test right Map<String, TSRecord> statHashMap = fManager.getAllStatisticsValue(); + Map<String, TSRecord> fileSizeStatMap = fileSize.getAllStatisticsValue(); String path = fManager.getAllPathForStatistic().get(0); + String fileSizeStatPath = fileSize.getAllPathForStatistic().get(0); int pos = path.lastIndexOf('.'); + int fileSizeStatPos = fileSizeStatPath.lastIndexOf('.'); TSRecord fTSRecord = statHashMap.get(path.substring(0, pos)); + TSRecord fileSizeRecord = fileSizeStatMap.get(fileSizeStatPath.substring(0, fileSizeStatPos)); assertNotEquals(null, fTSRecord); + assertNotEquals(null, fileSizeRecord); for (DataPoint dataPoint : fTSRecord.dataPointList) { String m = dataPoint.getMeasurementId(); Long v = (Long) dataPoint.getValue(); - if (m.equals("TOTAL_REQ_SUCCESS")) { assertEquals(v, new Long(0)); } @@ -116,6 +129,13 @@ public class MonitorTest { assertEquals(v, new Long(0)); } } + for (DataPoint dataPoint : fileSizeRecord.dataPointList) { + String m = dataPoint.getMeasurementId(); + Long v = (Long) dataPoint.getValue(); + if (m.equals(FileSizeConstants.OVERFLOW.name())) { + assertEquals(v, new Long(0)); + } + } try { fManager.deleteAll(); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/monitor/collector/FileSizeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/monitor/collector/FileSizeTest.java new file mode 100644 index 0000000..d075060 --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/monitor/collector/FileSizeTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.monitor.collector; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import org.apache.iotdb.db.monitor.MonitorConstants.FileSizeConstants; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class FileSizeTest { + + private static final String TEST_FILE_CONTENT = "FileSize UT test file"; + private static final String TEST_FILE_PATH = + FileSizeConstants.DATA.getPath() + File.separatorChar + "testFile"; + + @Before + public void setUp() throws Exception { + EnvironmentUtils.closeMemControl(); + EnvironmentUtils.envSetUp(); + } + + @After + public void tearDown() throws Exception { + EnvironmentUtils.cleanEnv(); + } + + @Test + public void testGetFileSizesInByte() { + long dataSizeBefore; + long dataSizeAfter; + boolean isWriteSuccess = true; + File testFile = new File(TEST_FILE_PATH); + if (testFile.exists()) { + try { + Files.delete(testFile.toPath()); + } catch (IOException e) { + isWriteSuccess = false; + e.printStackTrace(); + } + } + try { + if (!testFile.createNewFile()) { + isWriteSuccess = false; + } + } catch (IOException e) { + e.printStackTrace(); + } + + dataSizeBefore = FileSize.getInstance().getFileSizesInByte().get(FileSizeConstants.DATA); + byte[] contentInBytes = TEST_FILE_CONTENT.getBytes(); + // write something into the test file under data dir + try (FileOutputStream fileOutputStream = new FileOutputStream(testFile)) { + fileOutputStream.write(contentInBytes); + fileOutputStream.flush(); + } catch (IOException e) { + isWriteSuccess = false; + e.printStackTrace(); + } + // calculate the delta of data dir file size + dataSizeAfter = FileSize.getInstance().getFileSizesInByte().get(FileSizeConstants.DATA); + long deltaSize = dataSizeAfter - dataSizeBefore; + + if (isWriteSuccess) { + //check if the the delta of data dir file size is equal to the written content size in byte + assertEquals(contentInBytes.length, deltaSize); + } else { + assertEquals(0, deltaSize); + } + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java index 83b7f2a..8cac0ea 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/datapoint/DataPoint.java @@ -85,7 +85,7 @@ public abstract class DataPoint { dataPoint = new StringDataPoint(measurementId, new Binary(value)); break; default: - throw new UnSupportedDataTypeException("This data type is not supoort -" + dataType); + throw new UnSupportedDataTypeException("This data type is not supported -" + dataType); } return dataPoint; }
