This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch refactor_filenode2 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit b75c08772b1e5a6973c0ab59417a31c1300b0ba3 Author: xiangdong huang <[email protected]> AuthorDate: Mon Mar 4 00:23:37 2019 +0800 rewrite FileNode: temporary commit --- .../iotdb/db/engine/filenode/FileNodeManager.java | 20 ++ .../iotdb/db/engine/filenode/FileNodeManager2.java | 68 ++++ .../db/engine/filenode/FileNodeProcessor.java | 33 +- .../org/apache/iotdb/db/monitor/StatMonitor2.java | 359 +++++++++++++++++++++ 4 files changed, 470 insertions(+), 10 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java index 049235f..3bdc12b 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java @@ -724,6 +724,26 @@ public class FileNodeManager implements IStatistic, IService { } /** + * Append one specified tsfile to the storage group. <b>This method is only provided for + * transmission module</b> + * + * @param fileNodeName the seriesPath of storage group + * @param appendFile the appended tsfile information + */ + public boolean appendFileToFileNode2(String fileNodeName, IntervalFileNode appendFile, + String appendFilePath) throws FileNodeManagerException { + // 0. if the filenode is in merging process, block. + // 1. if the appendFile has a longer history (i.e, startTime and endTime of each Devices are + // suitable), then just copy the file to the data folder and update file lists; + // 2. if the appendFile has a longer history but the startTime and endTime of some devices are + // overlappted with other TsFiles, then forward this part of data into overflowProcessor and keep + // the rest part of data as a new TsFile. + // 3. if the start time of some devices >= bufferwrite's timestamp, then flush the bufferwrite, + // and then copy the file to the dta folder + return false; + } + + /** * get all overlap tsfiles which are conflict with the appendFile. * * @param fileNodeName the seriesPath of storage group diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager2.java new file mode 100644 index 0000000..db01ef2 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager2.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.filenode; + + +import java.io.File; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.conf.directories.Directories; +import org.apache.iotdb.db.monitor.StatMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FileNodeManager2 { + + private static final Logger LOGGER = LoggerFactory.getLogger(FileNodeManager.class); + private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig(); + private static final Directories directories = Directories.getInstance(); + + /** + * a folder that persist FileNodeProcessorStore classes. Each stroage group will have a subfolder. + * by default, it is data/system/info + */ + private final String baseDir; + + public static FileNodeManager2 getInstance() { + return FileNodeManagerHolder.INSTANCE; + } + + private static class FileNodeManagerHolder { + + private FileNodeManagerHolder() { + } + + private static final FileNodeManager2 INSTANCE = new FileNodeManager2(IoTDBDescriptor.getInstance().getConfig().fileNodeDir); + } + + private FileNodeManager2(String baseDir) { + //create folder first + File dir = new File(baseDir); + if (dir.mkdirs()) { + LOGGER.info("{} dir's home folder doesn't exist, create it", dir.getPath()); + } + this.baseDir = dir.getAbsolutePath(); + + + } + private void registToStatMonitor() { + StatMonitor.getInstance().recovery(); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java index 11f0d6a..1b3c189 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java @@ -148,7 +148,12 @@ public class FileNodeProcessor extends Processor implements IStatistic { private boolean shouldRecovery = false; // statistic monitor parameters private Map<String, Action> parameters; + private FileSchema fileSchema; + + /** + * used for saving fileNodeProcessorStore on disk. + */ private Action flushFileNodeProcessorAction = () -> { synchronized (fileNodeProcessorStore) { try { @@ -158,6 +163,10 @@ public class FileNodeProcessor extends Processor implements IStatistic { } } }; + /** + * used for updating flushLastUpdateTimeMap as lastUpdateTimeMap.value()+1, + * and updating lastUpdateTimeMap into fileNodeProcessorStore + */ private Action bufferwriteFlushAction = () -> { // update the lastUpdateTime Notice: Thread safe synchronized (fileNodeProcessorStore) { @@ -171,17 +180,24 @@ public class FileNodeProcessor extends Processor implements IStatistic { } }; + /** + * + */ private Action bufferwriteCloseAction = new Action() { @Override public void act() { synchronized (fileNodeProcessorStore) { + //why do not deep copy? fileNodeProcessorStore.setLastUpdateTimeMap(lastUpdateTimeMap); addLastTimeToIntervalFile(); fileNodeProcessorStore.setNewFileNodes(newFileNodes); } } + /** + * modify the endTimeMap of currentIntervalFileNode + */ private void addLastTimeToIntervalFile() { if (!newFileNodes.isEmpty()) { @@ -568,7 +584,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { return overflowProcessor; } - public boolean hasOverflowProcessor() { + boolean hasOverflowProcessor() { return overflowProcessor != null; } @@ -585,7 +601,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { /** * set last update time. */ - public void setLastUpdateTime(String deviceId, long timestamp) { + void setLastUpdateTime(String deviceId, long timestamp) { if (!lastUpdateTimeMap.containsKey(deviceId) || lastUpdateTimeMap.get(deviceId) < timestamp) { lastUpdateTimeMap.put(deviceId, timestamp); } @@ -594,7 +610,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { /** * get last update time. */ - public long getLastUpdateTime(String deviceId) { + long getLastUpdateTime(String deviceId) { if (lastUpdateTimeMap.containsKey(deviceId)) { return lastUpdateTimeMap.get(deviceId); @@ -606,21 +622,18 @@ public class FileNodeProcessor extends Processor implements IStatistic { /** * get flush last update time. */ - public long getFlushLastUpdateTime(String deviceId) { + long getFlushLastUpdateTime(String deviceId) { if (!flushLastUpdateTimeMap.containsKey(deviceId)) { flushLastUpdateTimeMap.put(deviceId, 0L); } return flushLastUpdateTimeMap.get(deviceId); } - public Map<String, Long> getLastUpdateTimeMap() { - return lastUpdateTimeMap; - } /** * For insert overflow. */ - public void changeTypeToChanged(String deviceId, long timestamp) { + void changeTypeToChanged(String deviceId, long timestamp) { if (!invertedIndexOfFiles.containsKey(deviceId)) { LOGGER.warn( WARN_NO_SUCH_OVERFLOWED_FILE @@ -819,11 +832,11 @@ public class FileNodeProcessor extends Processor implements IStatistic { /** * append one specified tsfile to this filenode processor. - * + * TODO * @param appendFile the appended tsfile information * @param appendFilePath the seriesPath of appended file */ - public void appendFile(IntervalFileNode appendFile, String appendFilePath) + void appendFile(IntervalFileNode appendFile, String appendFilePath) throws FileNodeProcessorException { try { if (!new File(appendFile.getFilePath()).getParentFile().exists()) { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor2.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor2.java new file mode 100644 index 0000000..a10c777 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor2.java @@ -0,0 +1,359 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.monitor; + +import com.sun.tools.javac.util.List; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.concurrent.ThreadName; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.filenode.FileNodeManager; +import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.exception.MetadataArgsErrorException; +import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.exception.StartupException; +import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.service.IService; +import org.apache.iotdb.db.service.ServiceType; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; +import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StatMonitor2 implements IService { + + private static final Logger LOGGER = LoggerFactory.getLogger(StatMonitor2.class); + private final int backLoopPeriod; + private final int statMonitorDetectFreqSec; + private final int statMonitorRetainIntervalSec; + private long runningTimeMillis = System.currentTimeMillis(); + /** + * key: is the statistics store seriesPath Value: is an interface that implements statistics + * function. + */ + private HashMap<String, IStatistic> statisticMap; + private ScheduledExecutorService service; + + // key: monitored object (e.g., a file node manager, a file node.) + // value: key: metric name, value: the value + private HashMap<String, Map<String, Float>> statistics; + + public void regist + + /** + * stats params. + */ + private AtomicLong numBackLoop = new AtomicLong(0); + private AtomicLong numInsert = new AtomicLong(0); + private AtomicLong numPointsInsert = new AtomicLong(0); + private AtomicLong numInsertError = new AtomicLong(0); + + private StatMonitor2() { + MManager mmanager = MManager.getInstance(); + statisticMap = new HashMap<>(); + IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + statMonitorDetectFreqSec = config.statMonitorDetectFreqSec; + statMonitorRetainIntervalSec = config.statMonitorRetainIntervalSec; + backLoopPeriod = config.backLoopPeriodSec; + if (config.enableStatMonitor) { + try { + String prefix = MonitorConstants.STAT_STORAGE_GROUP_PREFIX; + if (!mmanager.pathExist(prefix)) { + mmanager.setStorageLevelToMTree(prefix); + } + } catch (PathErrorException | IOException e) { + LOGGER.error("MManager cannot set storage level to MTree.", e); + } + } + } + + public static StatMonitor2 getInstance() { + return StatMonitorHolder.INSTANCE; + } + + /** + * generate TSRecord. + * + * @param hashMap key is statParams name, values is AtomicLong type + * @param statGroupDeltaName is the deviceId seriesPath of this module + * @param curTime TODO need to be fixed because it may contain overflow + * @return TSRecord contains the DataPoints of a statGroupDeltaName + */ + public static TSRecord convertToTSRecord(Map<String, AtomicLong> hashMap, + String statGroupDeltaName, long curTime) { + TSRecord tsRecord = new TSRecord(curTime, statGroupDeltaName); + tsRecord.dataPointList = new ArrayList<DataPoint>() { + { + for (Map.Entry<String, AtomicLong> entry : hashMap.entrySet()) { + AtomicLong value = entry.getValue(); + add(new LongDataPoint(entry.getKey(), value.get())); + } + } + }; + return tsRecord; + } + + public long getNumPointsInsert() { + return numPointsInsert.get(); + } + + public long getNumInsert() { + return numInsert.get(); + } + + public long getNumInsertError() { + return numInsertError.get(); + } + + public void registStatStorageGroup() { + MManager mManager = MManager.getInstance(); + String prefix = MonitorConstants.STAT_STORAGE_GROUP_PREFIX; + try { + if (!mManager.pathExist(prefix)) { + mManager.setStorageLevelToMTree(prefix); + } + } catch (Exception e) { + LOGGER.error("MManager cannot set storage level to MTree.", e); + } + } + + public synchronized void registStatStorageGroup(Map<String, String> hashMap) { + MManager mManager = MManager.getInstance(); + try { + for (Map.Entry<String, String> entry : hashMap.entrySet()) { + if (entry.getKey() == null) { + LOGGER.error("Registering metadata but {} is null", entry.getKey()); + } + + if (!mManager.pathExist(entry.getKey())) { + mManager.addPathToMTree(entry.getKey(), TSDataType.valueOf(entry.getValue()), + TSEncoding.valueOf("RLE"), CompressionType.valueOf(TSFileConfig.compressor), + Collections.emptyMap()); + } + } + } catch (MetadataArgsErrorException | IOException | PathErrorException e) { + LOGGER.error("Initialize the metadata error.", e); + } + } + + public void recovery() { + // // restore the FildeNode Manager TOTAL_POINTS statistics info + } + + public void activate() { + + service = IoTDBThreadPoolFactory.newScheduledThreadPool(1, + ThreadName.STAT_MONITOR.getName()); + service.scheduleAtFixedRate( + new StatBackLoop(), 1, backLoopPeriod, TimeUnit.SECONDS); + } + + public void clearIStatisticMap() { + statisticMap.clear(); + } + + public long getNumBackLoop() { + return numBackLoop.get(); + } + + public void registStatistics(String path, IStatistic iStatistic) { + synchronized (statisticMap) { + LOGGER.debug("Register {} to StatMonitor for statistics service", path); + this.statisticMap.put(path, iStatistic); + } + } + + /** + * deregist statistics. + */ + public void deregistStatistics(String path) { + LOGGER.debug("Deregister {} in StatMonitor for stopping statistics service", path); + synchronized (statisticMap) { + if (statisticMap.containsKey(path)) { + statisticMap.put(path, null); + } + } + } + + /** + * TODO: need to complete the query key concept. + * + * @return TSRecord, query statistics params + */ + public Map<String, TSRecord> getOneStatisticsValue(String key) { + // queryPath like fileNode seriesPath: root.stats.car1, + // or FileNodeManager seriesPath:FileNodeManager + String queryPath; + if (key.contains("\\.")) { + queryPath = MonitorConstants.STAT_STORAGE_GROUP_PREFIX + MonitorConstants.MONITOR_PATH_SEPERATOR + + key.replaceAll("\\.", "_"); + } else { + queryPath = key; + } + if (statisticMap.containsKey(queryPath)) { + return statisticMap.get(queryPath).getAllStatisticsValue(); + } else { + long currentTimeMillis = System.currentTimeMillis(); + HashMap<String, TSRecord> hashMap = new HashMap<>(); + TSRecord tsRecord = convertToTSRecord( + MonitorConstants.initValues(MonitorConstants.FILENODE_PROCESSOR_CONST), queryPath, + currentTimeMillis); + hashMap.put(queryPath, tsRecord); + return hashMap; + } + } + + /** + * get all statistics. + */ + public HashMap<String, TSRecord> gatherStatistics() { + synchronized (statisticMap) { + long currentTimeMillis = System.currentTimeMillis(); + HashMap<String, TSRecord> tsRecordHashMap = new HashMap<>(); + for (Map.Entry<String, IStatistic> entry : statisticMap.entrySet()) { + if (entry.getValue() == null) { + tsRecordHashMap.put(entry.getKey(), + convertToTSRecord( + MonitorConstants.initValues(MonitorConstants.FILENODE_PROCESSOR_CONST), + entry.getKey(), currentTimeMillis)); + } else { + tsRecordHashMap.putAll(entry.getValue().getAllStatisticsValue()); + } + } + LOGGER.debug("Values of tsRecordHashMap is : {}", tsRecordHashMap.toString()); + for (TSRecord value : tsRecordHashMap.values()) { + value.time = currentTimeMillis; + } + return tsRecordHashMap; + } + } + + private void insert(HashMap<String, TSRecord> tsRecordHashMap) { + FileNodeManager fManager = FileNodeManager.getInstance(); + int count = 0; + int pointNum; + for (Map.Entry<String, TSRecord> entry : tsRecordHashMap.entrySet()) { + try { + fManager.insert(entry.getValue(), true); + numInsert.incrementAndGet(); + pointNum = entry.getValue().dataPointList.size(); + numPointsInsert.addAndGet(pointNum); + count += pointNum; + } catch (FileNodeManagerException e) { + numInsertError.incrementAndGet(); + LOGGER.error("Inserting stat points error.", e); + } + } + } + + /** + * close statistic service. + */ + public void close() { + + if (service == null || service.isShutdown()) { + return; + } + statisticMap.clear(); + service.shutdown(); + try { + service.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error("StatMonitor timing service could not be shutdown.", e); + } + } + + @Override + public void start() throws StartupException { + try { + if (IoTDBDescriptor.getInstance().getConfig().enableStatMonitor) { + activate(); + } + } catch (Exception e) { + String errorMessage = String + .format("Failed to start %s because of %s", this.getID().getName(), + e.getMessage()); + throw new StartupException(errorMessage); + } + } + + @Override + public void stop() { + if (IoTDBDescriptor.getInstance().getConfig().enableStatMonitor) { + close(); + } + } + + @Override + public ServiceType getID() { + return ServiceType.STAT_MONITOR_SERVICE; + } + + private static class StatMonitorHolder { + private StatMonitorHolder(){ + //allowed do nothing + } + private static final StatMonitor2 INSTANCE = new StatMonitor2(); + } + + class StatBackLoop implements Runnable { + @Override + public void run() { + try { + long currentTimeMillis = System.currentTimeMillis(); + long seconds = (currentTimeMillis - runningTimeMillis) / 1000; + if (seconds - statMonitorDetectFreqSec >= 0) { + runningTimeMillis = currentTimeMillis; + // delete time-series data + FileNodeManager fManager = FileNodeManager.getInstance(); + try { + for (Map.Entry<String, IStatistic> entry : statisticMap.entrySet()) { + for (String statParamName : entry.getValue().getStatParamsHashMap().keySet()) { + fManager.delete(entry.getKey(), statParamName, + currentTimeMillis - statMonitorRetainIntervalSec * 1000); + } + } + } catch (FileNodeManagerException e) { + LOGGER.error("Error occurred when deleting statistics information periodically, because", + e); + } + } + HashMap<String, TSRecord> tsRecordHashMap = gatherStatistics(); + insert(tsRecordHashMap); + numBackLoop.incrementAndGet(); + } catch (Exception e) { + LOGGER.error("Error occurred in Stat Monitor thread", e); + } + } + } +}
