This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch refactor_filenode2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/refactor_filenode2 by this
push:
new 2fada7b temporary commit
2fada7b is described below
commit 2fada7b922eb07e4071a7502e09698c14fc4a5a8
Author: xiangdong huang <[email protected]>
AuthorDate: Thu Mar 7 23:28:47 2019 +0800
temporary commit
---
.../iotdb/db/engine/filenode/FileNodeManager.java | 20 +-
.../iotdb/db/engine/filenode/FileNodeManager2.java | 241 ++-
.../db/engine/filenode/FileNodeProcessor2.java | 2021 ++++++++++++++++++++
3 files changed, 2262 insertions(+), 20 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 3bdc12b..5f1407f 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -723,25 +723,7 @@ public class FileNodeManager implements IStatistic,
IService {
return true;
}
- /**
- * Append one specified tsfile to the storage group. <b>This method is only
provided for
- * transmission module</b>
- *
- * @param fileNodeName the seriesPath of storage group
- * @param appendFile the appended tsfile information
- */
- public boolean appendFileToFileNode2(String fileNodeName, IntervalFileNode
appendFile,
- String appendFilePath) throws FileNodeManagerException {
- // 0. if the filenode is in merging process, block.
- // 1. if the appendFile has a longer history (i.e, startTime and endTime
of each Devices are
- // suitable), then just copy the file to the data folder and update file
lists;
- // 2. if the appendFile has a longer history but the startTime and endTime
of some devices are
- // overlappted with other TsFiles, then forward this part of data into
overflowProcessor and keep
- // the rest part of data as a new TsFile.
- // 3. if the start time of some devices >= bufferwrite's timestamp, then
flush the bufferwrite,
- // and then copy the file to the dta folder
- return false;
- }
+
/**
* get all overlap tsfiles which are conflict with the appendFile.
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager2.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager2.java
index db01ef2..548a455 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager2.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager2.java
@@ -21,14 +21,34 @@ package org.apache.iotdb.db.engine.filenode;
import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.FileNodeProcessorException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.monitor.IStatistic;
import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FileNodeManager2 {
+public class FileNodeManager2 implements IStatistic, IService {
private static final Logger LOGGER =
LoggerFactory.getLogger(FileNodeManager.class);
private static final IoTDBConfig TsFileDBConf =
IoTDBDescriptor.getInstance().getConfig();
@@ -65,4 +85,223 @@ public class FileNodeManager2 {
private void registToStatMonitor() {
StatMonitor.getInstance().recovery();
}
+
+ /**
+ * recovery the filenode manager.
+ */
+ public void recovery() {
+ List<String> filenodeNames = null;
+ try {
+ filenodeNames = MManager.getInstance().getAllFileNames();
+ } catch (PathErrorException e) {
+ LOGGER.error("Restoring all FileNodes failed.", e);
+ return;
+ }
+ for (String filenodeName : filenodeNames) {
+ FileNodeProcessor fileNodeProcessor = null;
+ try {
+ fileNodeProcessor = getProcessor(filenodeName, true);
+ if (fileNodeProcessor.shouldRecovery()) {
+ LOGGER.info("Recovery the filenode processor, the filenode is {},
the status is {}",
+ filenodeName, fileNodeProcessor.getFileNodeProcessorStatus());
+ fileNodeProcessor.fileNodeRecovery();
+ }
+ } catch (FileNodeManagerException | FileNodeProcessorException e) {
+ LOGGER.error("Restoring fileNode {} failed.", filenodeName, e);
+ } finally {
+ if (fileNodeProcessor != null) {
+ fileNodeProcessor.writeUnlock();
+ }
+ }
+ // add index check sum
+ }
+ }
+
+ /**
+ * insert TsRecord into storage group.
+ *
+ * @param tsRecord input Data
+ * @param isMonitor if true, the insertion is done by StatMonitor and the
statistic Info will not
+ * be recorded. if false, the statParamsHashMap will be updated.
+ * @return an int value represents the insert type
+ */
+ public int insert(TSRecord tsRecord, boolean isMonitor) throws
FileNodeManagerException {
+
+ }
+
+ /**
+ * update data.
+ */
+ public void update(String deviceId, String measurementId, long startTime,
long endTime,
+ TSDataType type, String v)
+ throws FileNodeManagerException {
+
+ }
+
+ /**
+ * delete data.
+ */
+ public void delete(String deviceId, String measurementId, long timestamp)
+ throws FileNodeManagerException {
+
+ }
+
+ /**
+ * Similar to delete(), but only deletes data in BufferWrite. Only used by
WAL recovery.
+ */
+ public void deleteBufferWrite(String deviceId, String measurementId, long
timestamp)
+ throws FileNodeManagerException {
+
+ }
+
+ /**
+ * Similar to delete(), but only deletes data in Overflow. Only used by WAL
recovery.
+ */
+ public void deleteOverflow(String deviceId, String measurementId, long
timestamp)
+ throws FileNodeManagerException {
+
+ }
+
+ /**
+ * begin query.
+ * @param deviceId queried deviceId
+ * @return a query token for the device.
+ */
+ public int beginQuery(String deviceId) throws FileNodeManagerException {
+
+ }
+
+ /**
+ * query data.
+ */
+ public QueryDataSource query(SingleSeriesExpression seriesExpression,
QueryContext context)
+ throws FileNodeManagerException {
+
+ }
+
+ /**
+ * end query.
+ */
+ public void endQuery(String deviceId, int token) throws
FileNodeManagerException {
+
+ }
+
+ /**
+ * Append one specified tsfile to the storage group. <b>This method is only
provided for
+ * transmission module</b>
+ *
+ * @param fileNodeName the seriesPath of storage group
+ * @param appendFile the appended tsfile information
+ */
+ public boolean appendFileToFileNode(String fileNodeName, IntervalFileNode
appendFile,
+ String appendFilePath) throws FileNodeManagerException {
+ // 0. if the filenode is in merging process, block.
+ // 1. if the appendFile has a longer history (i.e, startTime and endTime
of each Devices are
+ // suitable), then just copy the file to the data folder and update file
lists;
+ // 2. if the appendFile has a longer history but the startTime and endTime
of some devices are
+ // overlappted with other TsFiles, then forward this part of data into
overflowProcessor and keep
+ // the rest part of data as a new TsFile.
+ // 3. if the start time of some devices >= bufferwrite's timestamp, then
flush the bufferwrite,
+ // and then copy the file to the dta folder
+ }
+
+ /**
+ * delete one filenode.
+ */
+ public void deleteOneFileNode(String processorName) throws
FileNodeManagerException {
+
+ }
+
+ /**
+ * add time series.
+ */
+ public void addTimeSeries(
+ Path path, TSDataType dataType, TSEncoding encoding, CompressionType
compressor,
+ Map<String, String> props) throws FileNodeManagerException {
+
+ }
+
+ /**
+ * Force to close the filenode processor.
+ */
+ public void closeOneFileNode(String processorName) throws
FileNodeManagerException {
+
+ }
+
+ /**
+ * delete all filenode.
+ */
+ public synchronized boolean deleteAll() throws FileNodeManagerException {
+
+ }
+
+ /**
+ * Try to close All.
+ */
+ public void closeAll() throws FileNodeManagerException {
+
+ }
+
+ /**
+ * force flush to control memory usage.
+ */
+ public void forceFlush(BasicMemController.UsageLevel level) {
+
+ }
+
+ /**
+ * recover filenode.
+ */
+ public void recoverFileNode(String filenodeName)
+ throws FileNodeManagerException {
+
+ }
+
+ /**
+ * merge all overflowed filenode.
+ *
+ * @throws FileNodeManagerException FileNodeManagerException
+ */
+ public void mergeAll() throws FileNodeManagerException {
+
+ }
+
+
+
+
+
+ @Override
+ public Map<String, TSRecord> getAllStatisticsValue() {
+ return null;
+ }
+
+ @Override
+ public void registStatMetadata() {
+
+ }
+
+ @Override
+ public List<String> getAllPathForStatistic() {
+ return null;
+ }
+
+ @Override
+ public Map<String, AtomicLong> getStatParamsHashMap() {
+ return null;
+ }
+
+ @Override
+ public void start() throws StartupException {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public ServiceType getID() {
+ return null;
+ }
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor2.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor2.java
new file mode 100644
index 0000000..311d04a
--- /dev/null
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor2.java
@@ -0,0 +1,2021 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.filenode;
+
+import static java.time.ZonedDateTime.ofInstant;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.engine.Processor;
+import org.apache.iotdb.db.engine.bufferwrite.Action;
+import org.apache.iotdb.db.engine.bufferwrite.ActionException;
+import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
+import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
+import org.apache.iotdb.db.engine.pool.MergeManager;
+import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
+import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
+import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.exception.ErrorDebugException;
+import org.apache.iotdb.db.exception.FileNodeProcessorException;
+import org.apache.iotdb.db.exception.OverflowProcessorException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.monitor.IStatistic;
+import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.IReader;
+import org.apache.iotdb.db.utils.MemUtils;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileNodeProcessor2 extends Processor implements IStatistic {
+
+ private static final String WARN_NO_SUCH_OVERFLOWED_FILE = "Can not find any
tsfile which"
+ + " will be overflowed in the filenode processor {}, ";
+ private static final String RESTORE_FILE_SUFFIX = ".restore";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileNodeProcessor2.class);
+ private static final IoTDBConfig TsFileDBConf =
IoTDBDescriptor.getInstance().getConfig();
+ private static final MManager mManager = MManager.getInstance();
+ private static final Directories directories = Directories.getInstance();
+ private final String statStorageDeltaName;
+ private final HashMap<String, AtomicLong> statParamsHashMap = new
HashMap<>();
+ /**
+ * Used to keep the oldest timestamp for each deviceId. The key is deviceId.
+ */
+ private volatile boolean isOverflowed;
+ private Map<String, Long> lastUpdateTimeMap;
+ private Map<String, Long> flushLastUpdateTimeMap;
+ private Map<String, List<IntervalFileNode>> invertedIndexOfFiles;
+ private IntervalFileNode emptyIntervalFileNode;
+ private IntervalFileNode currentIntervalFileNode;
+ private List<IntervalFileNode> newFileNodes;
+ private FileNodeProcessorStatus isMerging;
+ // this is used when work->merge operation
+ private int numOfMergeFile;
+ private FileNodeProcessorStore fileNodeProcessorStore;
+ private String fileNodeRestoreFilePath;
+ private final Object fileNodeRestoreLock = new Object();
+ private String baseDirPath;
+ // last merge time
+ private long lastMergeTime = -1;
+ private BufferWriteProcessor bufferWriteProcessor = null;
+ private OverflowProcessor overflowProcessor = null;
+ private Set<Integer> oldMultiPassTokenSet = null;
+ private Set<Integer> newMultiPassTokenSet = new HashSet<>();
+ private ReadWriteLock oldMultiPassLock = null;
+ private ReadWriteLock newMultiPassLock = new ReentrantReadWriteLock(false);
+ // system recovery
+ private boolean shouldRecovery = false;
+ // statistic monitor parameters
+ private Map<String, Action> parameters;
+
+ private FileSchema fileSchema;
+
+ /**
+ * used for saving fileNodeProcessorStore on disk.
+ */
+ private Action flushFileNodeProcessorAction = () -> {
+ synchronized (fileNodeProcessorStore) {
+ try {
+ writeStoreToDisk(fileNodeProcessorStore);
+ } catch (FileNodeProcessorException e) {
+ throw new ActionException(e);
+ }
+ }
+ };
+ /**
+ * used for updating flushLastUpdateTimeMap as lastUpdateTimeMap.value()+1,
+ * and updating lastUpdateTimeMap into fileNodeProcessorStore
+ */
+ private Action bufferwriteFlushAction = () -> {
+ // update the lastUpdateTime Notice: Thread safe
+ synchronized (fileNodeProcessorStore) {
+ // deep copy
+ Map<String, Long> tempLastUpdateMap = new HashMap<>(lastUpdateTimeMap);
+ // update flushLastUpdateTimeMap
+ for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
+ flushLastUpdateTimeMap.put(entry.getKey(), entry.getValue() + 1);
+ }
+ fileNodeProcessorStore.setLastUpdateTimeMap(tempLastUpdateMap);
+ }
+ };
+
+ /**
+ *
+ */
+ private Action bufferwriteCloseAction = new Action() {
+
+ @Override
+ public void act() {
+ synchronized (fileNodeProcessorStore) {
+ //why do not deep copy?
+ fileNodeProcessorStore.setLastUpdateTimeMap(lastUpdateTimeMap);
+ addLastTimeToIntervalFile();
+ fileNodeProcessorStore.setNewFileNodes(newFileNodes);
+ }
+ }
+
+ /**
+ * modify the endTimeMap of currentIntervalFileNode
+ */
+ private void addLastTimeToIntervalFile() {
+
+ if (!newFileNodes.isEmpty()) {
+ // end time with one start time
+ Map<String, Long> endTimeMap = new HashMap<>();
+ for (Entry<String, Long> startTime :
currentIntervalFileNode.getStartTimeMap().entrySet()) {
+ String deviceId = startTime.getKey();
+ endTimeMap.put(deviceId, lastUpdateTimeMap.get(deviceId));
+ }
+ currentIntervalFileNode.setEndTimeMap(endTimeMap);
+ }
+ }
+ };
+ private Action overflowFlushAction = () -> {
+
+ // update the new IntervalFileNode List and emptyIntervalFile.
+ // Notice: thread safe
+ synchronized (fileNodeProcessorStore) {
+ fileNodeProcessorStore.setOverflowed(isOverflowed);
+ fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode);
+ fileNodeProcessorStore.setNewFileNodes(newFileNodes);
+ }
+ };
+ // Token for query which used to
+ private int multiPassLockToken = 0;
+ private VersionController versionController;
+ private ReentrantLock mergeDeleteLock = new ReentrantLock();
+
+ /**
+ * This is the modification file of the result of the current merge.
+ */
+ private ModificationFile mergingModification;
+
+ private TsFileIOWriter mergeFileWriter = null;
+ private String mergeOutputPath = null;
+ private String mergeBaseDir = null;
+ private String mergeFileName = null;
+ private boolean mergeIsChunkGroupHasData = false;
+ private long mergeStartPos;
+
+ /**
+ * constructor of FileNodeProcessor.
+ */
+ FileNodeProcessor2(String fileNodeDirPath, String processorName)
+ throws FileNodeProcessorException {
+ super(processorName);
+ for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
+ MonitorConstants.FileNodeProcessorStatConstants.values()) {
+ statParamsHashMap.put(statConstant.name(), new AtomicLong(0));
+ }
+ statStorageDeltaName =
+ MonitorConstants.STAT_STORAGE_GROUP_PREFIX +
MonitorConstants.MONITOR_PATH_SEPERATOR
+ + MonitorConstants.FILE_NODE_PATH +
MonitorConstants.MONITOR_PATH_SEPERATOR
+ + processorName.replaceAll("\\.", "_");
+
+ this.parameters = new HashMap<>();
+ String dirPath = fileNodeDirPath;
+ if (dirPath.length() > 0
+ && dirPath.charAt(dirPath.length() - 1) != File.separatorChar) {
+ dirPath = dirPath + File.separatorChar;
+ }
+ this.baseDirPath = dirPath + processorName;
+ File dataDir = new File(this.baseDirPath);
+ if (!dataDir.exists()) {
+ dataDir.mkdirs();
+ LOGGER.info(
+ "The data directory of the filenode processor {} doesn't exist.
Create new " +
+ "directory {}",
+ getProcessorName(), baseDirPath);
+ }
+ fileNodeRestoreFilePath = new File(dataDir, processorName +
RESTORE_FILE_SUFFIX).getPath();
+ try {
+ fileNodeProcessorStore = readStoreFromDisk();
+ } catch (FileNodeProcessorException e) {
+ LOGGER.error(
+ "The fileNode processor {} encountered an error when recoverying
restore " +
+ "information.",
+ processorName, e);
+ throw new FileNodeProcessorException(e);
+ }
+ // TODO deep clone the lastupdate time
+ lastUpdateTimeMap = fileNodeProcessorStore.getLastUpdateTimeMap();
+ emptyIntervalFileNode = fileNodeProcessorStore.getEmptyIntervalFileNode();
+ newFileNodes = fileNodeProcessorStore.getNewFileNodes();
+ isMerging = fileNodeProcessorStore.getFileNodeProcessorStatus();
+ numOfMergeFile = fileNodeProcessorStore.getNumOfMergeFile();
+ invertedIndexOfFiles = new HashMap<>();
+ // deep clone
+ flushLastUpdateTimeMap = new HashMap<>();
+ for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
+ flushLastUpdateTimeMap.put(entry.getKey(), entry.getValue() + 1);
+ }
+ // construct the fileschema
+ try {
+ this.fileSchema = constructFileSchema(processorName);
+ } catch (WriteProcessException e) {
+ throw new FileNodeProcessorException(e);
+ }
+ // status is not NONE, or the last intervalFile is not closed
+ if (isMerging != FileNodeProcessorStatus.NONE
+ || (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() -
1).isClosed())) {
+ shouldRecovery = true;
+ } else {
+ // add file into the index of file
+ addAllFileIntoIndex(newFileNodes);
+ }
+ // RegistStatService
+ if (TsFileDBConf.enableStatMonitor) {
+ StatMonitor statMonitor = StatMonitor.getInstance();
+ registStatMetadata();
+ statMonitor.registStatistics(statStorageDeltaName, this);
+ }
+ try {
+ versionController = new SimpleFileVersionController(fileNodeDirPath);
+ } catch (IOException e) {
+ throw new FileNodeProcessorException(e);
+ }
+ }
+
+ @Override
+ public Map<String, AtomicLong> getStatParamsHashMap() {
+ return statParamsHashMap;
+ }
+
+ @Override
+ public void registStatMetadata() {
+ Map<String, String> hashMap = new HashMap<>();
+ for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
+ MonitorConstants.FileNodeProcessorStatConstants.values()) {
+ hashMap
+ .put(statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR
+ statConstant.name(),
+ MonitorConstants.DATA_TYPE);
+ }
+ StatMonitor.getInstance().registStatStorageGroup(hashMap);
+ }
+
+ @Override
+ public List<String> getAllPathForStatistic() {
+ List<String> list = new ArrayList<>();
+ for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
+ MonitorConstants.FileNodeProcessorStatConstants.values()) {
+ list.add(
+ statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR +
statConstant.name());
+ }
+ return list;
+ }
+
+ @Override
+ public Map<String, TSRecord> getAllStatisticsValue() {
+ Long curTime = System.currentTimeMillis();
+ HashMap<String, TSRecord> tsRecordHashMap = new HashMap<>();
+ TSRecord tsRecord = new TSRecord(curTime, statStorageDeltaName);
+
+ Map<String, AtomicLong> hashMap = getStatParamsHashMap();
+ tsRecord.dataPointList = new ArrayList<>();
+ for (Entry<String, AtomicLong> entry : hashMap.entrySet()) {
+ tsRecord.dataPointList.add(new LongDataPoint(entry.getKey(),
entry.getValue().get()));
+ }
+
+ tsRecordHashMap.put(statStorageDeltaName, tsRecord);
+ return tsRecordHashMap;
+ }
+
+ /**
+ * add interval FileNode.
+ */
+ void addIntervalFileNode(String baseDir, String fileName) throws
ActionException {
+
+ IntervalFileNode intervalFileNode = new
IntervalFileNode(OverflowChangeType.NO_CHANGE, baseDir,
+ fileName);
+ this.currentIntervalFileNode = intervalFileNode;
+ newFileNodes.add(intervalFileNode);
+ fileNodeProcessorStore.setNewFileNodes(newFileNodes);
+ flushFileNodeProcessorAction.act();
+ }
+
+ /**
+ * set interval filenode start time.
+ *
+ * @param deviceId device ID
+ */
+ void setIntervalFileNodeStartTime(String deviceId) {
+ if (currentIntervalFileNode.getStartTime(deviceId) == -1) {
+ currentIntervalFileNode.setStartTime(deviceId,
flushLastUpdateTimeMap.get(deviceId));
+ if (!invertedIndexOfFiles.containsKey(deviceId)) {
+ invertedIndexOfFiles.put(deviceId, new ArrayList<>());
+ }
+ invertedIndexOfFiles.get(deviceId).add(currentIntervalFileNode);
+ }
+ }
+
+ /**
+ * clear filenode.
+ */
+ public void clearFileNode() {
+ isOverflowed = false;
+ emptyIntervalFileNode = new IntervalFileNode(OverflowChangeType.NO_CHANGE,
null);
+ newFileNodes = new ArrayList<>();
+ isMerging = FileNodeProcessorStatus.NONE;
+ numOfMergeFile = 0;
+ fileNodeProcessorStore.setLastUpdateTimeMap(lastUpdateTimeMap);
+ fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging);
+ fileNodeProcessorStore.setNewFileNodes(newFileNodes);
+ fileNodeProcessorStore.setNumOfMergeFile(numOfMergeFile);
+ fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode);
+ }
+
+ private void addAllFileIntoIndex(List<IntervalFileNode> fileList) {
+ // clear map
+ invertedIndexOfFiles.clear();
+ // add all file to index
+ for (IntervalFileNode fileNode : fileList) {
+ if (fileNode.getStartTimeMap().isEmpty()) {
+ continue;
+ }
+ for (String deviceId : fileNode.getStartTimeMap().keySet()) {
+ if (!invertedIndexOfFiles.containsKey(deviceId)) {
+ invertedIndexOfFiles.put(deviceId, new ArrayList<>());
+ }
+ invertedIndexOfFiles.get(deviceId).add(fileNode);
+ }
+ }
+ }
+
+ public boolean shouldRecovery() {
+ return shouldRecovery;
+ }
+
+ public boolean isOverflowed() {
+ return isOverflowed;
+ }
+
+ /**
+ * if overflow insert, update and delete write into this filenode processor,
set
+ * <code>isOverflowed</code> to true.
+ */
+ public void setOverflowed(boolean isOverflowed) {
+ if (this.isOverflowed != isOverflowed) {
+ this.isOverflowed = isOverflowed;
+ }
+ }
+
+ public FileNodeProcessorStatus getFileNodeProcessorStatus() {
+ return isMerging;
+ }
+
+ /**
+ * execute filenode recovery.
+ */
+ public void fileNodeRecovery() throws FileNodeProcessorException {
+ // restore bufferwrite
+ if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() -
1).isClosed()) {
+ //
+ // add the current file
+ //
+ currentIntervalFileNode = newFileNodes.get(newFileNodes.size() - 1);
+
+ // this bufferwrite file is not close by normal operation
+ String damagedFilePath = newFileNodes.get(newFileNodes.size() -
1).getFilePath();
+ String[] fileNames = damagedFilePath.split("\\" + File.separator);
+ // all information to recovery the damaged file.
+ // contains file seriesPath, action parameters and processorName
+ parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION,
bufferwriteFlushAction);
+ parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION,
bufferwriteCloseAction);
+ parameters
+ .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION,
flushFileNodeProcessorAction);
+ String baseDir = directories
+ .getTsFileFolder(newFileNodes.get(newFileNodes.size() -
1).getBaseDirIndex());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "The filenode processor {} will recovery the bufferwrite
processor, "
+ + "the bufferwrite file is {}",
+ getProcessorName(), fileNames[fileNames.length - 1]);
+ }
+
+ try {
+ bufferWriteProcessor = new BufferWriteProcessor(baseDir,
getProcessorName(),
+ fileNames[fileNames.length - 1], parameters, versionController,
fileSchema);
+ } catch (BufferWriteProcessorException e) {
+ LOGGER.error(
+ "The filenode processor {} failed to recovery the bufferwrite
processor, "
+ + "the last bufferwrite file is {}.",
+ getProcessorName(), fileNames[fileNames.length - 1]);
+ throw new FileNodeProcessorException(e);
+ }
+ }
+ // restore the overflow processor
+ LOGGER.info("The filenode processor {} will recovery the overflow
processor.",
+ getProcessorName());
+ parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION,
overflowFlushAction);
+ parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION,
flushFileNodeProcessorAction);
+ try {
+ overflowProcessor = new OverflowProcessor(getProcessorName(),
parameters, fileSchema,
+ versionController);
+ } catch (IOException e) {
+ LOGGER.error("The filenode processor {} failed to recovery the overflow
processor.",
+ getProcessorName());
+ throw new FileNodeProcessorException(e);
+ }
+
+ shouldRecovery = false;
+
+ if (isMerging == FileNodeProcessorStatus.MERGING_WRITE) {
+ // re-merge all file
+ // if bufferwrite processor is not null, and close
+ LOGGER.info("The filenode processor {} is recovering, the filenode
status is {}.",
+ getProcessorName(), isMerging);
+ merge();
+ } else if (isMerging == FileNodeProcessorStatus.WAITING) {
+ // unlock
+ LOGGER.info("The filenode processor {} is recovering, the filenode
status is {}.",
+ getProcessorName(), isMerging);
+ //writeUnlock();
+ switchWaitingToWorking();
+ } else {
+ //writeUnlock();
+ }
+ // add file into index of file
+ addAllFileIntoIndex(newFileNodes);
+ }
+
+ /**
+ * get buffer write processor by processor name and insert time.
+ */
+ public BufferWriteProcessor getBufferWriteProcessor(String processorName,
long insertTime)
+ throws FileNodeProcessorException {
+ if (bufferWriteProcessor == null) {
+ Map<String, Action> params = new HashMap<>();
+ params.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION,
bufferwriteFlushAction);
+ params.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION,
bufferwriteCloseAction);
+ params
+ .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION,
flushFileNodeProcessorAction);
+ String baseDir = directories.getNextFolderForTsfile();
+ LOGGER.info("Allocate folder {} for the new bufferwrite processor.",
baseDir);
+ // construct processor or restore
+ try {
+ bufferWriteProcessor = new BufferWriteProcessor(baseDir, processorName,
+ insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
+ + System.currentTimeMillis(),
+ params, versionController, fileSchema);
+ } catch (BufferWriteProcessorException e) {
+ LOGGER.error("The filenode processor {} failed to get the bufferwrite
processor.",
+ processorName, e);
+ throw new FileNodeProcessorException(e);
+ }
+ }
+ return bufferWriteProcessor;
+ }
+
+ /**
+ * get buffer write processor.
+ */
+ public BufferWriteProcessor getBufferWriteProcessor() throws
FileNodeProcessorException {
+ if (bufferWriteProcessor == null) {
+ LOGGER.error("The bufferwrite processor is null when get the
bufferwriteProcessor");
+ throw new FileNodeProcessorException("The bufferwrite processor is
null");
+ }
+ return bufferWriteProcessor;
+ }
+
+ /**
+ * get overflow processor by processor name.
+ */
+ public OverflowProcessor getOverflowProcessor(String processorName) throws
IOException {
+ if (overflowProcessor == null) {
+ Map<String, Action> params = new HashMap<>();
+ // construct processor or restore
+ params.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
+ params
+ .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION,
flushFileNodeProcessorAction);
+ overflowProcessor = new OverflowProcessor(processorName, params,
fileSchema,
+ versionController);
+ }
+ return overflowProcessor;
+ }
+
+ /**
+ * get overflow processor.
+ */
+ public OverflowProcessor getOverflowProcessor() {
+ if (overflowProcessor == null) {
+ LOGGER.error("The overflow processor is null when getting the
overflowProcessor");
+ }
+ return overflowProcessor;
+ }
+
+ boolean hasOverflowProcessor() {
+ return overflowProcessor != null;
+ }
+
+ public void setBufferwriteProcessroToClosed() {
+
+ bufferWriteProcessor = null;
+ }
+
+ public boolean hasBufferwriteProcessor() {
+
+ return bufferWriteProcessor != null;
+ }
+
+ /**
+ * set last update time.
+ */
+ void setLastUpdateTime(String deviceId, long timestamp) {
+ if (!lastUpdateTimeMap.containsKey(deviceId) ||
lastUpdateTimeMap.get(deviceId) < timestamp) {
+ lastUpdateTimeMap.put(deviceId, timestamp);
+ }
+ }
+
+ /**
+ * get last update time.
+ */
+ long getLastUpdateTime(String deviceId) {
+
+ if (lastUpdateTimeMap.containsKey(deviceId)) {
+ return lastUpdateTimeMap.get(deviceId);
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * get flush last update time.
+ */
+ long getFlushLastUpdateTime(String deviceId) {
+ if (!flushLastUpdateTimeMap.containsKey(deviceId)) {
+ flushLastUpdateTimeMap.put(deviceId, 0L);
+ }
+ return flushLastUpdateTimeMap.get(deviceId);
+ }
+
+
+ /**
+ * For insert overflow.
+ */
+ void changeTypeToChanged(String deviceId, long timestamp) {
+ if (!invertedIndexOfFiles.containsKey(deviceId)) {
+ LOGGER.warn(
+ WARN_NO_SUCH_OVERFLOWED_FILE
+ + "the data is [device:{},time:{}]",
+ getProcessorName(), deviceId, timestamp);
+ emptyIntervalFileNode.setStartTime(deviceId, 0L);
+ emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
+ emptyIntervalFileNode.changeTypeToChanged(isMerging);
+ } else {
+ List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
+ int index = searchIndexNodeByTimestamp(deviceId, timestamp, temp);
+ changeTypeToChanged(temp.get(index), deviceId);
+ }
+ }
+
+ private void changeTypeToChanged(IntervalFileNode fileNode, String deviceId)
{
+ fileNode.changeTypeToChanged(isMerging);
+ if (isMerging == FileNodeProcessorStatus.MERGING_WRITE) {
+ fileNode.addMergeChanged(deviceId);
+ }
+ }
+
+ /**
+ * For update overflow.
+ */
+ public void changeTypeToChanged(String deviceId, long startTime, long
endTime) {
+ if (!invertedIndexOfFiles.containsKey(deviceId)) {
+ LOGGER.warn(
+ WARN_NO_SUCH_OVERFLOWED_FILE
+ + "the data is [device:{}, start time:{}, end time:{}]",
+ getProcessorName(), deviceId, startTime, endTime);
+ emptyIntervalFileNode.setStartTime(deviceId, 0L);
+ emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
+ emptyIntervalFileNode.changeTypeToChanged(isMerging);
+ } else {
+ List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
+ int left = searchIndexNodeByTimestamp(deviceId, startTime, temp);
+ int right = searchIndexNodeByTimestamp(deviceId, endTime, temp);
+ for (int i = left; i <= right; i++) {
+ changeTypeToChanged(temp.get(i), deviceId);
+ }
+ }
+ }
+
+ /**
+ * For delete overflow.
+ */
+ public void changeTypeToChangedForDelete(String deviceId, long timestamp) {
+ if (!invertedIndexOfFiles.containsKey(deviceId)) {
+ LOGGER.warn(
+ WARN_NO_SUCH_OVERFLOWED_FILE
+ + "the data is [device:{}, delete time:{}]",
+ getProcessorName(), deviceId, timestamp);
+ emptyIntervalFileNode.setStartTime(deviceId, 0L);
+ emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
+ emptyIntervalFileNode.changeTypeToChanged(isMerging);
+ } else {
+ List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
+ int index = searchIndexNodeByTimestamp(deviceId, timestamp, temp);
+ for (int i = 0; i <= index; i++) {
+ temp.get(i).changeTypeToChanged(isMerging);
+ if (isMerging == FileNodeProcessorStatus.MERGING_WRITE) {
+ temp.get(i).addMergeChanged(deviceId);
+ }
+ }
+ }
+ }
+
+ /**
+ * Search the index of the interval by the timestamp.
+ *
+ * @return index of interval
+ */
+ private int searchIndexNodeByTimestamp(String deviceId, long timestamp,
+ List<IntervalFileNode> fileList) {
+ int index = 1;
+ while (index < fileList.size()) {
+ if (timestamp < fileList.get(index).getStartTime(deviceId)) {
+ break;
+ } else {
+ index++;
+ }
+ }
+ return index - 1;
+ }
+
+ /**
+ * add multiple pass lock.
+ */
+ public int addMultiPassLock() {
+ LOGGER.debug("Add MultiPassLock: read lock newMultiPassLock.");
+ newMultiPassLock.readLock().lock();
+ while (newMultiPassTokenSet.contains(multiPassLockToken)) {
+ multiPassLockToken++;
+ }
+ newMultiPassTokenSet.add(multiPassLockToken);
+ LOGGER.debug("Add multi token:{}, nsPath:{}.", multiPassLockToken,
getProcessorName());
+ return multiPassLockToken;
+ }
+
+ /**
+ * remove multiple pass lock. TODO: use the return value or remove it.
+ */
+ public boolean removeMultiPassLock(int token) {
+ if (newMultiPassTokenSet.contains(token)) {
+ newMultiPassLock.readLock().unlock();
+ newMultiPassTokenSet.remove(token);
+ LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}",
token,
+ getProcessorName(),
+ newMultiPassTokenSet, newMultiPassLock);
+ return true;
+ } else if (oldMultiPassTokenSet != null &&
oldMultiPassTokenSet.contains(token)) {
+ // remove token first, then unlock
+ oldMultiPassLock.readLock().unlock();
+ oldMultiPassTokenSet.remove(token);
+ LOGGER.debug("Remove multi token:{}, old set:{}, lock:{}", token,
oldMultiPassTokenSet,
+ oldMultiPassLock);
+ return true;
+ } else {
+ LOGGER.error("remove token error:{},new set:{}, old set:{}", token,
newMultiPassTokenSet,
+ oldMultiPassTokenSet);
+ // should add throw exception
+ return false;
+ }
+ }
+
+ /**
+ * query data.
+ */
+ public <T extends Comparable<T>> QueryDataSource query(String deviceId,
String measurementId,
+ QueryContext context) throws FileNodeProcessorException {
+ // query overflow data
+ MeasurementSchema mSchema;
+ TSDataType dataType;
+
+ //mSchema = mManager.getSchemaForOnePath(deviceId + "." + measurementId);
+ mSchema = fileSchema.getMeasurementSchema(measurementId);
+ dataType = mSchema.getType();
+
+ OverflowSeriesDataSource overflowSeriesDataSource;
+ try {
+ overflowSeriesDataSource = overflowProcessor.query(deviceId,
measurementId, dataType,
+ mSchema.getProps(), context);
+ } catch (IOException e) {
+ throw new FileNodeProcessorException(e);
+ }
+ // tsfile dataØØ
+ List<IntervalFileNode> bufferwriteDataInFiles = new ArrayList<>();
+ for (IntervalFileNode intervalFileNode : newFileNodes) {
+ // add the same intervalFileNode, but not the same reference
+ if (intervalFileNode.isClosed()) {
+ bufferwriteDataInFiles.add(intervalFileNode.backUp());
+ }
+ }
+ Pair<ReadOnlyMemChunk, List<ChunkMetaData>> bufferwritedata = new
Pair<>(null, null);
+ // bufferwrite data
+ UnsealedTsFile unsealedTsFile = null;
+
+ if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() -
1).isClosed()
+ && !newFileNodes.get(newFileNodes.size() -
1).getStartTimeMap().isEmpty()) {
+ unsealedTsFile = new UnsealedTsFile();
+ unsealedTsFile.setFilePath(newFileNodes.get(newFileNodes.size() -
1).getFilePath());
+ if (bufferWriteProcessor == null) {
+ LOGGER.error(
+ "The last of tsfile {} in filenode processor {} is not closed, "
+ + "but the bufferwrite processor is null.",
+ newFileNodes.get(newFileNodes.size() - 1).getRelativePath(),
getProcessorName());
+ throw new FileNodeProcessorException(String.format(
+ "The last of tsfile %s in filenode processor %s is not closed, "
+ + "but the bufferwrite processor is null.",
+ newFileNodes.get(newFileNodes.size() - 1).getRelativePath(),
getProcessorName()));
+ }
+ bufferwritedata = bufferWriteProcessor
+ .queryBufferWriteData(deviceId, measurementId, dataType,
mSchema.getProps());
+
+ try {
+ List<Modification> pathModifications = context.getPathModifications(
+ currentIntervalFileNode.getModFile(), deviceId
+ + IoTDBConstant.PATH_SEPARATOR + measurementId
+ );
+ if (!pathModifications.isEmpty()) {
+ QueryUtils.modifyChunkMetaData(bufferwritedata.right,
pathModifications);
+ }
+ } catch (IOException e) {
+ throw new FileNodeProcessorException(e);
+ }
+
+ unsealedTsFile.setTimeSeriesChunkMetaDatas(bufferwritedata.right);
+ }
+ GlobalSortedSeriesDataSource globalSortedSeriesDataSource = new
GlobalSortedSeriesDataSource(
+ new Path(deviceId + "." + measurementId), bufferwriteDataInFiles,
unsealedTsFile,
+ bufferwritedata.left);
+ return new QueryDataSource(globalSortedSeriesDataSource,
overflowSeriesDataSource);
+
+ }
+
+ /**
+ * append one specified tsfile to this filenode processor.
+ * TODO
+ * @param appendFile the appended tsfile information
+ * @param appendFilePath the seriesPath of appended file
+ */
+ void appendFile(IntervalFileNode appendFile, String appendFilePath)
+ throws FileNodeProcessorException {
+ try {
+ if (!new File(appendFile.getFilePath()).getParentFile().exists()) {
+ new File(appendFile.getFilePath()).getParentFile().mkdirs();
+ }
+ // move file
+ File originFile = new File(appendFilePath);
+ File targetFile = new File(appendFile.getFilePath());
+ if (!originFile.exists()) {
+ throw new FileNodeProcessorException(
+ String.format("The appended file %s does not exist.",
appendFilePath));
+ }
+ if (targetFile.exists()) {
+ throw new FileNodeProcessorException(
+ String.format("The appended target file %s already exists.",
+ appendFile.getFilePath()));
+ }
+ if (!originFile.renameTo(targetFile)) {
+ LOGGER.warn("File renaming failed when appending new file. Origin: {},
target: {}",
+ originFile.getPath(),
+ targetFile.getPath());
+ }
+ // append the new tsfile
+ this.newFileNodes.add(appendFile);
+ // update the lastUpdateTime
+ for (Entry<String, Long> entry : appendFile.getEndTimeMap().entrySet()) {
+ lastUpdateTimeMap.put(entry.getKey(), entry.getValue());
+ }
+ bufferwriteFlushAction.act();
+ fileNodeProcessorStore.setNewFileNodes(newFileNodes);
+ // reconstruct the inverted index of the newFileNodes
+ flushFileNodeProcessorAction.act();
+ addAllFileIntoIndex(newFileNodes);
+ } catch (Exception e) {
+ LOGGER.error("Failed to append the tsfile {} to filenode processor {}.",
appendFile,
+ getProcessorName());
+ throw new FileNodeProcessorException(e);
+ }
+ }
+
+ /**
+ * get overlap tsfiles which are conflict with the appendFile.
+ *
+ * @param appendFile the appended tsfile information
+ */
+ public List<String> getOverlapFiles(IntervalFileNode appendFile, String uuid)
+ throws FileNodeProcessorException {
+ List<String> overlapFiles = new ArrayList<>();
+ try {
+ for (IntervalFileNode intervalFileNode : newFileNodes) {
+ getOverlapFiles(appendFile, intervalFileNode, uuid, overlapFiles);
+ }
+ } catch (IOException e) {
+ LOGGER.error("Failed to get overlap tsfiles which conflict with the
appendFile.");
+ throw new FileNodeProcessorException(e);
+ }
+ return overlapFiles;
+ }
+
+ private void getOverlapFiles(IntervalFileNode appendFile, IntervalFileNode
intervalFileNode,
+ String uuid, List<String> overlapFiles) throws IOException {
+ for (Entry<String, Long> entry : appendFile.getStartTimeMap().entrySet()) {
+ if (intervalFileNode.getStartTimeMap().containsKey(entry.getKey()) &&
+ intervalFileNode.getEndTime(entry.getKey()) >= entry.getValue()
+ && intervalFileNode.getStartTime(entry.getKey()) <= appendFile
+ .getEndTime(entry.getKey())) {
+ String relativeFilePath = "postback" + File.separator + uuid +
File.separator + "backup"
+ + File.separator + intervalFileNode.getRelativePath();
+ File newFile = new File(
+
Directories.getInstance().getTsFileFolder(intervalFileNode.getBaseDirIndex()),
+ relativeFilePath);
+ if (!newFile.getParentFile().exists()) {
+ newFile.getParentFile().mkdirs();
+ }
+ java.nio.file.Path link =
FileSystems.getDefault().getPath(newFile.getPath());
+ java.nio.file.Path target = FileSystems.getDefault()
+ .getPath(intervalFileNode.getFilePath());
+ Files.createLink(link, target);
+ overlapFiles.add(newFile.getPath());
+ break;
+ }
+ }
+ }
+
+ /**
+ * add time series.
+ */
+ public void addTimeSeries(String measurementId, TSDataType dataType,
TSEncoding encoding,
+ CompressionType compressor, Map<String, String> props) {
+ fileSchema.registerMeasurement(new MeasurementSchema(measurementId,
dataType, encoding,
+ compressor, props));
+ }
+
+ /**
+ * submit the merge task to the <code>MergePool</code>.
+ *
+ * @return null -can't submit the merge task, because this filenode is not
overflowed or it is
+ * merging now. Future - submit the merge task successfully.
+ */
+ Future submitToMerge() {
+ ZoneId zoneId = IoTDBDescriptor.getInstance().getConfig().getZoneID();
+ if (lastMergeTime > 0) {
+ long thisMergeTime = System.currentTimeMillis();
+ long mergeTimeInterval = thisMergeTime - lastMergeTime;
+ ZonedDateTime lastDateTime =
ofInstant(Instant.ofEpochMilli(lastMergeTime),
+ zoneId);
+ ZonedDateTime thisDateTime =
ofInstant(Instant.ofEpochMilli(thisMergeTime),
+ zoneId);
+ LOGGER.info(
+ "The filenode {} last merge time is {}, this merge time is {}, "
+ + "merge time interval is {}s",
+ getProcessorName(), lastDateTime, thisDateTime, mergeTimeInterval /
1000);
+ }
+ lastMergeTime = System.currentTimeMillis();
+
+ if (overflowProcessor != null) {
+ if (overflowProcessor.getFileSize() < IoTDBDescriptor.getInstance()
+ .getConfig().overflowFileSizeThreshold) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Skip this merge taks submission, because the size{} of overflow
processor {} "
+ + "does not reaches the threshold {}.",
+ MemUtils.bytesCntToStr(overflowProcessor.getFileSize()),
getProcessorName(),
+ MemUtils.bytesCntToStr(
+
IoTDBDescriptor.getInstance().getConfig().overflowFileSizeThreshold));
+ }
+ return null;
+ }
+ } else {
+ LOGGER.info(
+ "Skip this merge taks submission, because the filenode processor {} "
+ + "has no overflow processor.",
+ getProcessorName());
+ return null;
+ }
+ if (isOverflowed && isMerging == FileNodeProcessorStatus.NONE) {
+ Runnable mergeThread;
+ mergeThread = new MergeRunnale();
+ LOGGER.info("Submit the merge task, the merge filenode is {}",
getProcessorName());
+ return MergeManager.getInstance().submit(mergeThread);
+ } else {
+ if (!isOverflowed) {
+ LOGGER.info(
+ "Skip this merge taks submission, because the filenode processor
{} is not " +
+ "overflowed.",
+ getProcessorName());
+ } else {
+ LOGGER.warn(
+ "Skip this merge task submission, because last merge task is not
over yet, "
+ + "the merge filenode processor is {}",
+ getProcessorName());
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Prepare for merge, close the bufferwrite and overflow.
+ */
+ private void prepareForMerge() {
+ try {
+ LOGGER.info("The filenode processor {} prepares for merge, closes the
bufferwrite processor",
+ getProcessorName());
+ closeBufferWrite();
+ // try to get overflow processor
+ getOverflowProcessor(getProcessorName());
+ // must close the overflow processor
+ while (!getOverflowProcessor().canBeClosed()) {
+ waitForClosing();
+ }
+ LOGGER.info("The filenode processor {} prepares for merge, closes the
overflow processor",
+ getProcessorName());
+ getOverflowProcessor().close();
+ } catch (FileNodeProcessorException | OverflowProcessorException |
IOException e) {
+ LOGGER.error("The filenode processor {} prepares for merge error.",
getProcessorName());
+ writeUnlock();
+ throw new ErrorDebugException(e);
+ }
+ }
+
+ private void waitForClosing() {
+ try {
+ LOGGER.info(
+ "The filenode processor {} prepares for merge, the overflow {} can't
be closed, "
+ + "wait 100ms,",
+ getProcessorName(), getProcessorName());
+ TimeUnit.MICROSECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Merge this storage group, merge the tsfile data with overflow data.
+ */
+ public void merge() throws FileNodeProcessorException {
+ // close bufferwrite and overflow, prepare for merge
+ LOGGER.info("The filenode processor {} begins to merge.",
getProcessorName());
+ prepareForMerge();
+ // change status from overflowed to no overflowed
+ isOverflowed = false;
+ // change status from work to merge
+ isMerging = FileNodeProcessorStatus.MERGING_WRITE;
+ // check the empty file
+ Map<String, Long> startTimeMap = emptyIntervalFileNode.getStartTimeMap();
+ mergeCheckEmptyFile(startTimeMap);
+
+ for (IntervalFileNode intervalFileNode : newFileNodes) {
+ if (intervalFileNode.getOverflowChangeType() !=
OverflowChangeType.NO_CHANGE) {
+ intervalFileNode.setOverflowChangeType(OverflowChangeType.CHANGED);
+ }
+ }
+
+ addAllFileIntoIndex(newFileNodes);
+ synchronized (fileNodeProcessorStore) {
+ fileNodeProcessorStore.setOverflowed(isOverflowed);
+ fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging);
+ fileNodeProcessorStore.setNewFileNodes(newFileNodes);
+ fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode);
+ // flush this filenode information
+ try {
+ writeStoreToDisk(fileNodeProcessorStore);
+ } catch (FileNodeProcessorException e) {
+ LOGGER.error("The filenode processor {} writes restore information
error when merging.",
+ getProcessorName(), e);
+ writeUnlock();
+ throw new FileNodeProcessorException(e);
+ }
+ }
+ // add numOfMergeFile to control the number of the merge file
+ List<IntervalFileNode> backupIntervalFiles;
+
+ backupIntervalFiles = switchFileNodeToMerge();
+ //
+ // clear empty file
+ //
+ boolean needEmtpy = false;
+ if (emptyIntervalFileNode.getOverflowChangeType() !=
OverflowChangeType.NO_CHANGE) {
+ needEmtpy = true;
+ }
+ emptyIntervalFileNode.clear();
+ // attention
+ try {
+ overflowProcessor.switchWorkToMerge();
+ } catch (IOException e) {
+ LOGGER.error("The filenode processor {} can't switch overflow processor
from work to merge.",
+ getProcessorName(), e);
+ writeUnlock();
+ throw new FileNodeProcessorException(e);
+ }
+ LOGGER.info("The filenode processor {} switches from {} to {}.",
getProcessorName(),
+ FileNodeProcessorStatus.NONE, FileNodeProcessorStatus.MERGING_WRITE);
+ writeUnlock();
+
+ // query tsfile data and overflow data, and merge them
+ int numOfMergeFiles = 0;
+ int allNeedMergeFiles = backupIntervalFiles.size();
+ for (IntervalFileNode backupIntervalFile : backupIntervalFiles) {
+ numOfMergeFiles++;
+ if (backupIntervalFile.getOverflowChangeType() ==
OverflowChangeType.CHANGED) {
+ // query data and merge
+ String filePathBeforeMerge = backupIntervalFile.getRelativePath();
+ try {
+ LOGGER.info(
+ "The filenode processor {} begins merging the {}/{} tsfile[{}]
with "
+ + "overflow file, the process is {}%",
+ getProcessorName(), numOfMergeFiles, allNeedMergeFiles,
filePathBeforeMerge,
+ (int) (((numOfMergeFiles - 1) / (float) allNeedMergeFiles) *
100));
+ long startTime = System.currentTimeMillis();
+ String newFile = queryAndWriteDataForMerge(backupIntervalFile);
+ long endTime = System.currentTimeMillis();
+ long timeConsume = endTime - startTime;
+ ZoneId zoneId =
IoTDBDescriptor.getInstance().getConfig().getZoneID();
+ LOGGER.info(
+ "The fileNode processor {} has merged the {}/{} tsfile[{}->{}]
over, "
+ + "start time of merge is {}, end time of merge is {}, "
+ + "time consumption is {}ms,"
+ + " the process is {}%",
+ getProcessorName(), numOfMergeFiles, allNeedMergeFiles,
filePathBeforeMerge,
+ newFile, ofInstant(Instant.ofEpochMilli(startTime),
+ zoneId), ofInstant(Instant.ofEpochMilli(endTime), zoneId),
timeConsume,
+ numOfMergeFiles / (float) allNeedMergeFiles * 100);
+ } catch (IOException | PathErrorException e) {
+ LOGGER.error("Merge: query and write data error.", e);
+ throw new FileNodeProcessorException(e);
+ }
+ } else if (backupIntervalFile.getOverflowChangeType() ==
OverflowChangeType.MERGING_CHANGE) {
+ LOGGER.error("The overflowChangeType of backupIntervalFile must not be
{}",
+ OverflowChangeType.MERGING_CHANGE);
+ // handle this error, throw one runtime exception
+ throw new FileNodeProcessorException(
+ "The overflowChangeType of backupIntervalFile must not be "
+ + OverflowChangeType.MERGING_CHANGE);
+ } else {
+ LOGGER.debug(
+ "The filenode processor {} is merging, the interval file {}
doesn't "
+ + "need to be merged.",
+ getProcessorName(), backupIntervalFile.getRelativePath());
+ }
+ }
+
+ // change status from merge to wait
+ switchMergeToWaiting(backupIntervalFiles, needEmtpy);
+
+ // change status from wait to work
+ switchWaitingToWorking();
+ }
+
+ private void mergeCheckEmptyFile(Map<String, Long> startTimeMap) {
+ if (emptyIntervalFileNode.getOverflowChangeType() ==
OverflowChangeType.NO_CHANGE) {
+ return;
+ }
+ Iterator<Entry<String, Long>> iterator =
emptyIntervalFileNode.getEndTimeMap().entrySet()
+ .iterator();
+ while (iterator.hasNext()) {
+ Entry<String, Long> entry = iterator.next();
+ String deviceId = entry.getKey();
+ if (invertedIndexOfFiles.containsKey(deviceId)) {
+
invertedIndexOfFiles.get(deviceId).get(0).setOverflowChangeType(OverflowChangeType.CHANGED);
+ startTimeMap.remove(deviceId);
+ iterator.remove();
+ }
+ }
+ if (emptyIntervalFileNode.checkEmpty()) {
+ emptyIntervalFileNode.clear();
+ } else {
+ if (!newFileNodes.isEmpty()) {
+ IntervalFileNode first = newFileNodes.get(0);
+ for (String deviceId :
emptyIntervalFileNode.getStartTimeMap().keySet()) {
+ first.setStartTime(deviceId,
emptyIntervalFileNode.getStartTime(deviceId));
+ first.setEndTime(deviceId,
emptyIntervalFileNode.getEndTime(deviceId));
+ first.setOverflowChangeType(OverflowChangeType.CHANGED);
+ }
+ emptyIntervalFileNode.clear();
+ } else {
+
emptyIntervalFileNode.setOverflowChangeType(OverflowChangeType.CHANGED);
+ }
+ }
+ }
+
+ private List<IntervalFileNode> switchFileNodeToMerge() throws
FileNodeProcessorException {
+ List<IntervalFileNode> result = new ArrayList<>();
+ if (emptyIntervalFileNode.getOverflowChangeType() !=
OverflowChangeType.NO_CHANGE) {
+ // add empty
+ result.add(emptyIntervalFileNode.backUp());
+ if (!newFileNodes.isEmpty()) {
+ throw new FileNodeProcessorException(
+ String.format("The status of empty file is %s, but the new file
list is not empty",
+ emptyIntervalFileNode.getOverflowChangeType()));
+ }
+ return result;
+ }
+ if (newFileNodes.isEmpty()) {
+ LOGGER.error("No file was changed when merging, the filenode is {}",
getProcessorName());
+ throw new FileNodeProcessorException(
+ "No file was changed when merging, the filenode is " +
getProcessorName());
+ }
+ for (IntervalFileNode intervalFileNode : newFileNodes) {
+ updateFileNode(intervalFileNode, result);
+ }
+ return result;
+ }
+
+ private void updateFileNode(IntervalFileNode intervalFileNode,
List<IntervalFileNode> result) {
+ if (intervalFileNode.getOverflowChangeType() ==
OverflowChangeType.NO_CHANGE) {
+ result.add(intervalFileNode.backUp());
+ } else {
+ Map<String, Long> startTimeMap = new HashMap<>();
+ Map<String, Long> endTimeMap = new HashMap<>();
+ for (String deviceId : intervalFileNode.getEndTimeMap().keySet()) {
+ List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
+ int index = temp.indexOf(intervalFileNode);
+ int size = temp.size();
+ // start time
+ if (index == 0) {
+ startTimeMap.put(deviceId, 0L);
+ } else {
+ startTimeMap.put(deviceId, intervalFileNode.getStartTime(deviceId));
+ }
+ // end time
+ if (index < size - 1) {
+ endTimeMap.put(deviceId, temp.get(index + 1).getStartTime(deviceId)
- 1);
+ } else {
+ endTimeMap.put(deviceId, intervalFileNode.getEndTime(deviceId));
+ }
+ }
+ IntervalFileNode node = new IntervalFileNode(startTimeMap, endTimeMap,
+ intervalFileNode.getOverflowChangeType(),
intervalFileNode.getBaseDirIndex(),
+ intervalFileNode.getRelativePath());
+ result.add(node);
+ }
+ }
+
+ private void switchMergeToWaiting(List<IntervalFileNode>
backupIntervalFiles, boolean needEmpty)
+ throws FileNodeProcessorException {
+ LOGGER.info("The status of filenode processor {} switches from {} to {}.",
getProcessorName(),
+ FileNodeProcessorStatus.MERGING_WRITE,
FileNodeProcessorStatus.WAITING);
+ writeLock();
+ try {
+ oldMultiPassTokenSet = newMultiPassTokenSet;
+ oldMultiPassLock = newMultiPassLock;
+ newMultiPassTokenSet = new HashSet<>();
+ newMultiPassLock = new ReentrantReadWriteLock(false);
+ List<IntervalFileNode> result = new ArrayList<>();
+ int beginIndex = 0;
+ if (needEmpty) {
+ IntervalFileNode empty = backupIntervalFiles.get(0);
+ if (!empty.checkEmpty()) {
+ updateEmpty(empty, result);
+ beginIndex++;
+ }
+ }
+ // reconstruct the file index
+ addAllFileIntoIndex(backupIntervalFiles);
+ // check the merge changed file
+ for (int i = beginIndex; i < backupIntervalFiles.size(); i++) {
+ IntervalFileNode newFile = newFileNodes.get(i - beginIndex);
+ IntervalFileNode temp = backupIntervalFiles.get(i);
+ if (newFile.getOverflowChangeType() ==
OverflowChangeType.MERGING_CHANGE) {
+ updateMergeChanged(newFile, temp);
+ }
+ if (!temp.checkEmpty()) {
+ result.add(temp);
+ }
+ }
+ // add new file when merge
+ for (int i = backupIntervalFiles.size() - beginIndex; i <
newFileNodes.size(); i++) {
+ IntervalFileNode fileNode = newFileNodes.get(i);
+ if (fileNode.isClosed()) {
+ result.add(fileNode.backUp());
+ } else {
+ result.add(fileNode);
+ }
+ }
+
+ isMerging = FileNodeProcessorStatus.WAITING;
+ newFileNodes = result;
+ // reconstruct the index
+ addAllFileIntoIndex(newFileNodes);
+ // clear merge changed
+ for (IntervalFileNode fileNode : newFileNodes) {
+ fileNode.clearMergeChanged();
+ }
+
+ synchronized (fileNodeProcessorStore) {
+ fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging);
+ fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode);
+ fileNodeProcessorStore.setNewFileNodes(newFileNodes);
+ try {
+ writeStoreToDisk(fileNodeProcessorStore);
+ } catch (FileNodeProcessorException e) {
+ LOGGER.error(
+ "Merge: failed to write filenode information to revocery file,
the filenode is " +
+ "{}.",
+ getProcessorName(), e);
+ throw new FileNodeProcessorException(
+ "Merge: write filenode information to revocery file failed, the
filenode is "
+ + getProcessorName());
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private void updateEmpty(IntervalFileNode empty, List<IntervalFileNode>
result) {
+ for (String deviceId : empty.getStartTimeMap().keySet()) {
+ if (invertedIndexOfFiles.containsKey(deviceId)) {
+ IntervalFileNode temp = invertedIndexOfFiles.get(deviceId).get(0);
+ if (temp.getMergeChanged().contains(deviceId)) {
+ empty.setOverflowChangeType(OverflowChangeType.CHANGED);
+ break;
+ }
+ }
+ }
+ empty.clearMergeChanged();
+ result.add(empty.backUp());
+ }
+
+ private void updateMergeChanged(IntervalFileNode newFile, IntervalFileNode
temp) {
+ for (String deviceId : newFile.getMergeChanged()) {
+ if (temp.getStartTimeMap().containsKey(deviceId)) {
+ temp.setOverflowChangeType(OverflowChangeType.CHANGED);
+ } else {
+ changeTypeToChanged(deviceId, newFile.getStartTime(deviceId),
+ newFile.getEndTime(deviceId));
+ }
+ }
+ }
+
+
+ private void switchWaitingToWorking()
+ throws FileNodeProcessorException {
+
+ LOGGER.info("The status of filenode processor {} switches from {} to {}.",
getProcessorName(),
+ FileNodeProcessorStatus.WAITING, FileNodeProcessorStatus.NONE);
+
+ if (oldMultiPassLock != null) {
+ LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple
Pass Lock is {}",
+ oldMultiPassTokenSet,
+ oldMultiPassLock);
+ oldMultiPassLock.writeLock().lock();
+ }
+ try {
+ writeLock();
+ try {
+ // delete the all files which are in the newFileNodes
+ // notice: the last restore file of the interval file
+
+ List<String> bufferwriteDirPathList =
directories.getAllTsFileFolders();
+ List<File> bufferwriteDirList = new ArrayList<>();
+ collectBufferWriteDirs(bufferwriteDirPathList, bufferwriteDirList);
+
+ Set<String> bufferFiles = new HashSet<>();
+ collectBufferWriteFiles(bufferFiles);
+
+ // add the restore file, if the last file is not closed
+ if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() -
1).isClosed()) {
+ String bufferFileRestorePath =
+ newFileNodes.get(newFileNodes.size() - 1).getFilePath() +
RESTORE_FILE_SUFFIX;
+ bufferFiles.add(bufferFileRestorePath);
+ }
+
+ deleteBufferWriteFiles(bufferwriteDirList, bufferFiles);
+
+ // merge switch
+ changeFileNodes();
+
+ // overflow switch from merge to work
+ overflowProcessor.switchMergeToWork();
+ // write status to file
+ isMerging = FileNodeProcessorStatus.NONE;
+ synchronized (fileNodeProcessorStore) {
+ fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging);
+ fileNodeProcessorStore.setNewFileNodes(newFileNodes);
+
fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode);
+ writeStoreToDisk(fileNodeProcessorStore);
+ }
+ } catch (IOException e) {
+ LOGGER.info(
+ "The filenode processor {} encountered an error when its "
+ + "status switched from {} to {}.",
+ getProcessorName(), FileNodeProcessorStatus.NONE,
+ FileNodeProcessorStatus.MERGING_WRITE);
+ throw new FileNodeProcessorException(e);
+ } finally {
+ writeUnlock();
+ }
+ } finally {
+ oldMultiPassTokenSet = null;
+ if (oldMultiPassLock != null) {
+ oldMultiPassLock.writeLock().unlock();
+ }
+ oldMultiPassLock = null;
+ }
+
+ }
+
+ private void collectBufferWriteDirs(List<String> bufferwriteDirPathList,
+ List<File> bufferwriteDirList) {
+ for (String bufferwriteDirPath : bufferwriteDirPathList) {
+ if (bufferwriteDirPath.length() > 0
+ && bufferwriteDirPath.charAt(bufferwriteDirPath.length() - 1)
+ != File.separatorChar) {
+ bufferwriteDirPath = bufferwriteDirPath + File.separatorChar;
+ }
+ bufferwriteDirPath = bufferwriteDirPath + getProcessorName();
+ File bufferwriteDir = new File(bufferwriteDirPath);
+ bufferwriteDirList.add(bufferwriteDir);
+ if (!bufferwriteDir.exists()) {
+ bufferwriteDir.mkdirs();
+ }
+ }
+ }
+
+ private void collectBufferWriteFiles(Set<String> bufferFiles) {
+ for (IntervalFileNode bufferFileNode : newFileNodes) {
+ String bufferFilePath = bufferFileNode.getFilePath();
+ if (bufferFilePath != null) {
+ bufferFiles.add(bufferFilePath);
+ }
+ }
+ }
+
+ private void deleteBufferWriteFiles(List<File> bufferwriteDirList,
Set<String> bufferFiles) {
+ for (File bufferwriteDir : bufferwriteDirList) {
+ File[] files = bufferwriteDir.listFiles();
+ if (files == null) {
+ continue;
+ }
+ for (File file : files) {
+ if (!bufferFiles.contains(file.getPath()) && !file.delete()) {
+ LOGGER.warn("Cannot delete BufferWrite file {}", file.getPath());
+ }
+ }
+ }
+ }
+
+ private void changeFileNodes() {
+ for (IntervalFileNode fileNode : newFileNodes) {
+ if (fileNode.getOverflowChangeType() != OverflowChangeType.NO_CHANGE) {
+ fileNode.setOverflowChangeType(OverflowChangeType.CHANGED);
+ }
+ }
+ }
+
+ private String queryAndWriteDataForMerge(IntervalFileNode backupIntervalFile)
+ throws IOException, FileNodeProcessorException, PathErrorException {
+ Map<String, Long> startTimeMap = new HashMap<>();
+ Map<String, Long> endTimeMap = new HashMap<>();
+
+ mergeFileWriter = null;
+ mergeOutputPath = null;
+ mergeBaseDir = null;
+ mergeFileName = null;
+ // modifications are blocked before mergeModification is created to avoid
+ // losing some modification.
+ mergeDeleteLock.lock();
+ QueryContext context = new QueryContext();
+ try {
+ for (String deviceId : backupIntervalFile.getStartTimeMap().keySet()) {
+ // query one deviceId
+ List<Path> pathList = new ArrayList<>();
+ mergeIsChunkGroupHasData = false;
+ mergeStartPos = -1;
+ ChunkGroupFooter footer;
+ int numOfChunk = 0;
+ try {
+ List<String> pathStrings =
mManager.getLeafNodePathInNextLevel(deviceId);
+ for (String string : pathStrings) {
+ pathList.add(new Path(string));
+ }
+ } catch (PathErrorException e) {
+ LOGGER.error("Can't get all the paths from MManager, the deviceId is
{}", deviceId);
+ throw new FileNodeProcessorException(e);
+ }
+ if (pathList.isEmpty()) {
+ continue;
+ }
+ for (Path path : pathList) {
+ // query one measurement in the special deviceId
+ String measurementId = path.getMeasurement();
+ TSDataType dataType = mManager.getSeriesType(path.getFullPath());
+ OverflowSeriesDataSource overflowSeriesDataSource =
overflowProcessor.queryMerge(deviceId,
+ measurementId, dataType, true, context);
+ Filter timeFilter = FilterFactory
+ .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
+ TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
+ SingleSeriesExpression seriesFilter = new
SingleSeriesExpression(path, timeFilter);
+ IReader seriesReader = SeriesReaderFactory.getInstance()
+ .createSeriesReaderForMerge(backupIntervalFile,
+ overflowSeriesDataSource, seriesFilter, context);
+ numOfChunk += queryAndWriteSeries(seriesReader, path, seriesFilter,
dataType,
+ startTimeMap, endTimeMap);
+ }
+ if (mergeIsChunkGroupHasData) {
+ // end the new rowGroupMetadata
+ long size = mergeFileWriter.getPos() - mergeStartPos;
+ footer = new ChunkGroupFooter(deviceId, size, numOfChunk);
+ mergeFileWriter.endChunkGroup(footer, 0);
+ }
+ }
+ } finally {
+ if (mergeDeleteLock.isLocked()) {
+ mergeDeleteLock.unlock();
+ }
+ }
+
+ if (mergeFileWriter != null) {
+ mergeFileWriter.endFile(fileSchema);
+ }
+
backupIntervalFile.setBaseDirIndex(directories.getTsFileFolderIndex(mergeBaseDir));
+ backupIntervalFile.setRelativePath(mergeFileName);
+ backupIntervalFile.setOverflowChangeType(OverflowChangeType.NO_CHANGE);
+ backupIntervalFile.setStartTimeMap(startTimeMap);
+ backupIntervalFile.setEndTimeMap(endTimeMap);
+ backupIntervalFile.setModFile(mergingModification);
+ mergingModification = null;
+ return mergeFileName;
+ }
+
+ private int queryAndWriteSeries(IReader seriesReader, Path path,
+ SingleSeriesExpression seriesFilter, TSDataType dataType,
+ Map<String, Long> startTimeMap, Map<String, Long> endTimeMap)
+ throws IOException {
+ int numOfChunk = 0;
+ try {
+ if (!seriesReader.hasNext()) {
+ LOGGER.debug(
+ "The time-series {} has no data with the filter {} in the filenode
processor {}",
+ path, seriesFilter, getProcessorName());
+ } else {
+ numOfChunk++;
+ TimeValuePair timeValuePair = seriesReader.next();
+ if (mergeFileWriter == null) {
+ mergeBaseDir = directories.getNextFolderForTsfile();
+ mergeFileName = timeValuePair.getTimestamp()
+ + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR +
System.currentTimeMillis();
+ mergeOutputPath = constructOutputFilePath(mergeBaseDir,
getProcessorName(),
+ mergeFileName);
+ mergeFileName = getProcessorName() + File.separatorChar +
mergeFileName;
+ mergeFileWriter = new TsFileIOWriter(new File(mergeOutputPath));
+ mergingModification = new ModificationFile(mergeOutputPath
+ + ModificationFile.FILE_SUFFIX);
+ mergeDeleteLock.unlock();
+ }
+ if (!mergeIsChunkGroupHasData) {
+ // start a new rowGroupMetadata
+ mergeIsChunkGroupHasData = true;
+ // the datasize and numOfChunk is fake
+ // the accurate datasize and numOfChunk will get after write all
this device data.
+ mergeFileWriter.startFlushChunkGroup(path.getDevice());// TODO
please check me.
+ mergeStartPos = mergeFileWriter.getPos();
+ }
+ // init the serieswWriteImpl
+ MeasurementSchema measurementSchema = fileSchema
+ .getMeasurementSchema(path.getMeasurement());
+ ChunkBuffer pageWriter = new ChunkBuffer(measurementSchema);
+ int pageSizeThreshold = TSFileConfig.pageSizeInByte;
+ ChunkWriterImpl seriesWriterImpl = new
ChunkWriterImpl(measurementSchema, pageWriter,
+ pageSizeThreshold);
+ // write the series data
+ writeOneSeries(path.getDevice(), seriesWriterImpl, dataType,
+ seriesReader,
+ startTimeMap, endTimeMap, timeValuePair);
+ // flush the series data
+ seriesWriterImpl.writeToFileWriter(mergeFileWriter);
+ }
+ } finally {
+ seriesReader.close();
+ }
+ return numOfChunk;
+ }
+
+
+ private void writeOneSeries(String deviceId, ChunkWriterImpl
seriesWriterImpl,
+ TSDataType dataType, IReader seriesReader, Map<String, Long>
startTimeMap,
+ Map<String, Long> endTimeMap, TimeValuePair firstTVPair) throws
IOException {
+ long startTime;
+ long endTime;
+ TimeValuePair localTV = firstTVPair;
+ writeTVPair(seriesWriterImpl, dataType, localTV);
+ startTime = endTime = localTV.getTimestamp();
+ if (!startTimeMap.containsKey(deviceId) || startTimeMap.get(deviceId) >
startTime) {
+ startTimeMap.put(deviceId, startTime);
+ }
+ if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) <
endTime) {
+ endTimeMap.put(deviceId, endTime);
+ }
+ while (seriesReader.hasNext()) {
+ localTV = seriesReader.next();
+ endTime = localTV.getTimestamp();
+ writeTVPair(seriesWriterImpl, dataType, localTV);
+ }
+ if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) <
endTime) {
+ endTimeMap.put(deviceId, endTime);
+ }
+ }
+
+ private void writeTVPair(ChunkWriterImpl seriesWriterImpl, TSDataType
dataType,
+ TimeValuePair timeValuePair) throws IOException {
+ switch (dataType) {
+ case BOOLEAN:
+ seriesWriterImpl.write(timeValuePair.getTimestamp(),
timeValuePair.getValue().getBoolean());
+ break;
+ case INT32:
+ seriesWriterImpl.write(timeValuePair.getTimestamp(),
timeValuePair.getValue().getInt());
+ break;
+ case INT64:
+ seriesWriterImpl.write(timeValuePair.getTimestamp(),
timeValuePair.getValue().getLong());
+ break;
+ case FLOAT:
+ seriesWriterImpl.write(timeValuePair.getTimestamp(),
timeValuePair.getValue().getFloat());
+ break;
+ case DOUBLE:
+ seriesWriterImpl.write(timeValuePair.getTimestamp(),
timeValuePair.getValue().getDouble());
+ break;
+ case TEXT:
+ seriesWriterImpl.write(timeValuePair.getTimestamp(),
timeValuePair.getValue().getBinary());
+ break;
+ default:
+ LOGGER.error("Not support data type: {}", dataType);
+ break;
+ }
+ }
+
+
+ private String constructOutputFilePath(String baseDir, String processorName,
String fileName) {
+
+ String localBaseDir = baseDir;
+ if (localBaseDir.charAt(localBaseDir.length() - 1) != File.separatorChar) {
+ localBaseDir = localBaseDir + File.separatorChar + processorName;
+ }
+ File dataDir = new File(localBaseDir);
+ if (!dataDir.exists()) {
+ LOGGER.warn("The bufferwrite processor data dir doesn't exists, create
new directory {}",
+ localBaseDir);
+ dataDir.mkdirs();
+ }
+ File outputFile = new File(dataDir, fileName);
+ return outputFile.getPath();
+ }
+
+ private FileSchema constructFileSchema(String processorName) throws
WriteProcessException {
+
+ List<MeasurementSchema> columnSchemaList;
+ columnSchemaList = mManager.getSchemaForFileName(processorName);
+
+ FileSchema schema = new FileSchema();
+ for (MeasurementSchema measurementSchema : columnSchemaList) {
+ schema.registerMeasurement(measurementSchema);
+ }
+ return schema;
+
+ }
+
+ @Override
+ public boolean canBeClosed() {
+ if (isMerging != FileNodeProcessorStatus.NONE) {
+ LOGGER.info("The filenode {} can't be closed, because the filenode
status is {}",
+ getProcessorName(),
+ isMerging);
+ return false;
+ }
+ if (!newMultiPassLock.writeLock().tryLock()) {
+ LOGGER.info("The filenode {} can't be closed, because it can't get
newMultiPassLock {}",
+ getProcessorName(), newMultiPassLock);
+ return false;
+ }
+
+ try {
+ if (oldMultiPassLock == null) {
+ return true;
+ }
+ if (oldMultiPassLock.writeLock().tryLock()) {
+ try {
+ return true;
+ } finally {
+ oldMultiPassLock.writeLock().unlock();
+ }
+ } else {
+ LOGGER.info("The filenode {} can't be closed, because it can't get"
+ + " oldMultiPassLock {}",
+ getProcessorName(), oldMultiPassLock);
+ return false;
+ }
+ } finally {
+ newMultiPassLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public FileNodeFlushFuture flush() throws IOException {
+ Future<Boolean> bufferWriteFlushFuture = null;
+ Future<Boolean> overflowFlushFuture = null;
+ if (bufferWriteProcessor != null) {
+ bufferWriteFlushFuture = bufferWriteProcessor.flush();
+ }
+ if (overflowProcessor != null) {
+ overflowFlushFuture = overflowProcessor.flush();
+ }
+ return new FileNodeFlushFuture(bufferWriteFlushFuture,
overflowFlushFuture);
+ }
+
+ /**
+ * Close the bufferwrite processor.
+ */
+ public void closeBufferWrite() throws FileNodeProcessorException {
+ if (bufferWriteProcessor == null) {
+ return;
+ }
+ try {
+ while (!bufferWriteProcessor.canBeClosed()) {
+ waitForBufferWriteClose();
+ }
+ bufferWriteProcessor.close();
+ bufferWriteProcessor = null;
+ } catch (BufferWriteProcessorException e) {
+ throw new FileNodeProcessorException(e);
+ }
+ }
+
+ private void waitForBufferWriteClose() {
+ try {
+ LOGGER.info("The bufferwrite {} can't be closed, wait 100ms",
+ bufferWriteProcessor.getProcessorName());
+ TimeUnit.MICROSECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ LOGGER.error("Unexpected interruption", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Close the overflow processor.
+ */
+ public void closeOverflow() throws FileNodeProcessorException {
+ if (overflowProcessor == null) {
+ return;
+ }
+ try {
+ while (!overflowProcessor.canBeClosed()) {
+ waitForOverflowClose();
+ }
+ overflowProcessor.close();
+ overflowProcessor.clear();
+ overflowProcessor = null;
+ } catch (OverflowProcessorException | IOException e) {
+ throw new FileNodeProcessorException(e);
+ }
+ }
+
+ private void waitForOverflowClose() {
+ try {
+ LOGGER.info("The overflow {} can't be closed, wait 100ms",
+ overflowProcessor.getProcessorName());
+ TimeUnit.MICROSECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ LOGGER.error("Unexpected interruption", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void close() throws FileNodeProcessorException {
+ closeBufferWrite();
+ closeOverflow();
+ for (IntervalFileNode fileNode : newFileNodes) {
+ if (fileNode.getModFile() != null) {
+ try {
+ fileNode.getModFile().close();
+ } catch (IOException e) {
+ throw new FileNodeProcessorException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * deregister the filenode processor.
+ */
+ public void delete() throws ProcessorException {
+ if (TsFileDBConf.enableStatMonitor) {
+ // remove the monitor
+ LOGGER.info("Deregister the filenode processor: {} from monitor.",
getProcessorName());
+ StatMonitor.getInstance().deregistStatistics(statStorageDeltaName);
+ }
+ closeBufferWrite();
+ closeOverflow();
+ for (IntervalFileNode fileNode : newFileNodes) {
+ if (fileNode.getModFile() != null) {
+ try {
+ fileNode.getModFile().close();
+ } catch (IOException e) {
+ throw new FileNodeProcessorException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public long memoryUsage() {
+ long memSize = 0;
+ if (bufferWriteProcessor != null) {
+ memSize += bufferWriteProcessor.memoryUsage();
+ }
+ if (overflowProcessor != null) {
+ memSize += overflowProcessor.memoryUsage();
+ }
+ return memSize;
+ }
+
+ private void writeStoreToDisk(FileNodeProcessorStore fileNodeProcessorStore)
+ throws FileNodeProcessorException {
+
+ synchronized (fileNodeRestoreLock) {
+ SerializeUtil<FileNodeProcessorStore> serializeUtil = new
SerializeUtil<>();
+ try {
+ serializeUtil.serialize(fileNodeProcessorStore,
fileNodeRestoreFilePath);
+ LOGGER.debug("The filenode processor {} writes restore information to
the restore file",
+ getProcessorName());
+ } catch (IOException e) {
+ throw new FileNodeProcessorException(e);
+ }
+ }
+ }
+
+ private FileNodeProcessorStore readStoreFromDisk() throws
FileNodeProcessorException {
+
+ synchronized (fileNodeRestoreLock) {
+ FileNodeProcessorStore processorStore;
+ SerializeUtil<FileNodeProcessorStore> serializeUtil = new
SerializeUtil<>();
+ try {
+ processorStore = serializeUtil.deserialize(fileNodeRestoreFilePath)
+ .orElse(new FileNodeProcessorStore(false, new HashMap<>(),
+ new IntervalFileNode(OverflowChangeType.NO_CHANGE, null),
+ new ArrayList<>(), FileNodeProcessorStatus.NONE, 0));
+ } catch (IOException e) {
+ throw new FileNodeProcessorException(e);
+ }
+ return processorStore;
+ }
+ }
+
+ String getFileNodeRestoreFilePath() {
+ return fileNodeRestoreFilePath;
+ }
+
+ /**
+ * Delete data whose timestamp <= 'timestamp' and belong to timeseries
deviceId.measurementId.
+ *
+ * @param deviceId the deviceId of the timeseries to be deleted.
+ * @param measurementId the measurementId of the timeseries to be deleted.
+ * @param timestamp the delete range is (0, timestamp].
+ */
+ public void delete(String deviceId, String measurementId, long timestamp)
throws IOException {
+ // TODO: how to avoid partial deletion?
+ mergeDeleteLock.lock();
+ long version = versionController.nextVersion();
+
+ // record what files are updated so we can roll back them in case of
exception
+ List<ModificationFile> updatedModFiles = new ArrayList<>();
+
+ try {
+ String fullPath = deviceId +
+ IoTDBConstant.PATH_SEPARATOR + measurementId;
+ Deletion deletion = new Deletion(fullPath, version, timestamp);
+ if (mergingModification != null) {
+ mergingModification.write(deletion);
+ updatedModFiles.add(mergingModification);
+ }
+ deleteBufferWriteFiles(deviceId, deletion, updatedModFiles);
+ // delete data in memory
+ OverflowProcessor ofProcessor = getOverflowProcessor(getProcessorName());
+ ofProcessor.delete(deviceId, measurementId, timestamp, version,
updatedModFiles);
+ if (bufferWriteProcessor != null) {
+ bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+ }
+ } catch (Exception e) {
+ // roll back
+ for (ModificationFile modFile : updatedModFiles) {
+ modFile.abort();
+ }
+ throw new IOException(e);
+ } finally {
+ mergeDeleteLock.unlock();
+ }
+ }
+
+ private void deleteBufferWriteFiles(String deviceId, Deletion deletion,
+ List<ModificationFile> updatedModFiles) throws IOException {
+ if (currentIntervalFileNode != null &&
currentIntervalFileNode.containsDevice(deviceId)) {
+ currentIntervalFileNode.getModFile().write(deletion);
+ updatedModFiles.add(currentIntervalFileNode.getModFile());
+ }
+ for (IntervalFileNode fileNode : newFileNodes) {
+ if (fileNode != currentIntervalFileNode &&
fileNode.containsDevice(deviceId)
+ && fileNode.getStartTime(deviceId) <= deletion.getTimestamp()) {
+ fileNode.getModFile().write(deletion);
+ updatedModFiles.add(fileNode.getModFile());
+ }
+ }
+ }
+
+ /**
+ * Similar to delete(), but only deletes data in BufferWrite. Only used by
WAL recovery.
+ */
+ public void deleteBufferWrite(String deviceId, String measurementId, long
timestamp)
+ throws IOException {
+ String fullPath = deviceId +
+ IoTDBConstant.PATH_SEPARATOR + measurementId;
+ long version = versionController.nextVersion();
+ Deletion deletion = new Deletion(fullPath, version, timestamp);
+
+ List<ModificationFile> updatedModFiles = new ArrayList<>();
+ try {
+ deleteBufferWriteFiles(deviceId, deletion, updatedModFiles);
+ } catch (IOException e) {
+ for (ModificationFile modificationFile : updatedModFiles) {
+ modificationFile.abort();
+ }
+ throw e;
+ }
+ if (bufferWriteProcessor != null) {
+ bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+ }
+ }
+
+ /**
+ * Similar to delete(), but only deletes data in Overflow. Only used by WAL
recovery.
+ */
+ public void deleteOverflow(String deviceId, String measurementId, long
timestamp)
+ throws IOException {
+ long version = versionController.nextVersion();
+
+ OverflowProcessor overflowProcessor =
getOverflowProcessor(getProcessorName());
+ List<ModificationFile> updatedModFiles = new ArrayList<>();
+ try {
+ overflowProcessor.delete(deviceId, measurementId, timestamp, version,
updatedModFiles);
+ } catch (IOException e) {
+ for (ModificationFile modificationFile : updatedModFiles) {
+ modificationFile.abort();
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ FileNodeProcessor2 that = (FileNodeProcessor2) o;
+ return isOverflowed == that.isOverflowed &&
+ numOfMergeFile == that.numOfMergeFile &&
+ lastMergeTime == that.lastMergeTime &&
+ shouldRecovery == that.shouldRecovery &&
+ multiPassLockToken == that.multiPassLockToken &&
+ Objects.equals(statStorageDeltaName, that.statStorageDeltaName) &&
+ Objects.equals(statParamsHashMap, that.statParamsHashMap) &&
+ Objects.equals(lastUpdateTimeMap, that.lastUpdateTimeMap) &&
+ Objects.equals(flushLastUpdateTimeMap, that.flushLastUpdateTimeMap) &&
+ Objects.equals(invertedIndexOfFiles, that.invertedIndexOfFiles) &&
+ Objects.equals(emptyIntervalFileNode, that.emptyIntervalFileNode) &&
+ Objects.equals(currentIntervalFileNode, that.currentIntervalFileNode)
&&
+ Objects.equals(newFileNodes, that.newFileNodes) &&
+ isMerging == that.isMerging &&
+ Objects.equals(fileNodeProcessorStore, that.fileNodeProcessorStore) &&
+ Objects.equals(fileNodeRestoreFilePath, that.fileNodeRestoreFilePath)
&&
+ Objects.equals(baseDirPath, that.baseDirPath) &&
+ Objects.equals(bufferWriteProcessor, that.bufferWriteProcessor) &&
+ Objects.equals(overflowProcessor, that.overflowProcessor) &&
+ Objects.equals(oldMultiPassTokenSet, that.oldMultiPassTokenSet) &&
+ Objects.equals(newMultiPassTokenSet, that.newMultiPassTokenSet) &&
+ Objects.equals(oldMultiPassLock, that.oldMultiPassLock) &&
+ Objects.equals(newMultiPassLock, that.newMultiPassLock) &&
+ Objects.equals(parameters, that.parameters) &&
+ Objects.equals(fileSchema, that.fileSchema) &&
+ Objects.equals(flushFileNodeProcessorAction,
that.flushFileNodeProcessorAction) &&
+ Objects.equals(bufferwriteFlushAction, that.bufferwriteFlushAction) &&
+ Objects.equals(bufferwriteCloseAction, that.bufferwriteCloseAction) &&
+ Objects.equals(overflowFlushAction, that.overflowFlushAction);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), statStorageDeltaName,
statParamsHashMap, isOverflowed,
+ lastUpdateTimeMap, flushLastUpdateTimeMap, invertedIndexOfFiles,
+ emptyIntervalFileNode, currentIntervalFileNode, newFileNodes,
isMerging,
+ numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath,
baseDirPath,
+ lastMergeTime, bufferWriteProcessor, overflowProcessor,
oldMultiPassTokenSet,
+ newMultiPassTokenSet, oldMultiPassLock, newMultiPassLock,
shouldRecovery, parameters,
+ fileSchema, flushFileNodeProcessorAction, bufferwriteFlushAction,
+ bufferwriteCloseAction, overflowFlushAction, multiPassLockToken);
+ }
+
+ public class MergeRunnale implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ ZoneId zoneId = IoTDBDescriptor.getInstance().getConfig().getZoneID();
+ long mergeStartTime = System.currentTimeMillis();
+ writeLock();
+ merge();
+ long mergeEndTime = System.currentTimeMillis();
+ long intervalTime = mergeEndTime - mergeStartTime;
+ LOGGER.info(
+ "The filenode processor {} merge start time is {}, "
+ + "merge end time is {}, merge consumes {}ms.",
+ getProcessorName(), ofInstant(Instant.ofEpochMilli(mergeStartTime),
+ zoneId), ofInstant(Instant.ofEpochMilli(mergeEndTime),
+ zoneId), intervalTime);
+ } catch (FileNodeProcessorException e) {
+ LOGGER.error("The filenode processor {} encountered an error when
merging.",
+ getProcessorName(), e);
+ throw new ErrorDebugException(e);
+ }
+ }
+ }
+}
\ No newline at end of file