This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch nvmlogging
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/nvmlogging by this push:
new 8fe9e7f update recovery (no tsDataMap)
8fe9e7f is described below
commit 8fe9e7fe104bb0e47145010b851c9963d1fac9f5
Author: mdf369 <[email protected]>
AuthorDate: Mon Jan 6 11:38:34 2020 +0800
update recovery (no tsDataMap)
---
.../iotdb/db/engine/memtable/AbstractMemTable.java | 5 +-
.../db/nvm/memtable/NVMPrimitiveMemTable.java | 21 ++++
.../iotdb/db/nvm/memtable/NVMWritableMemChunk.java | 6 +
.../apache/iotdb/db/nvm/metadata/DataTypeMemo.java | 16 +--
.../iotdb/db/nvm/metadata/FreeSpaceBitMap.java | 12 +-
.../iotdb/db/nvm/metadata/NVMSpaceMetadata.java | 8 +-
.../apache/iotdb/db/nvm/metadata/SpaceCount.java | 18 +++
.../apache/iotdb/db/nvm/metadata/TSDataMap.java | 15 ++-
.../iotdb/db/nvm/metadata/TimeValueMapper.java | 9 +-
.../nvm/recover/NVMMemtableRecoverPerformer.java | 140 ++++++++++++++++-----
.../iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java | 4 +-
.../apache/iotdb/db/nvm/space/NVMDataSpace.java | 44 ++++---
.../apache/iotdb/db/nvm/space/NVMSpaceManager.java | 93 ++++++--------
.../db/nvm/space/NVMSpaceMetadataManager.java | 108 ++++++++++++++++
.../java/org/apache/iotdb/db/service/IoTDB.java | 2 +
.../iotdb/db/utils/datastructure/NVMTVList.java | 40 +++++-
.../iotdb/db/writelog/recover/LogReplayer.java | 1 +
.../writelog/recover/TsFileRecoverPerformer.java | 43 +++++--
18 files changed, 440 insertions(+), 145 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index f9d88af..6ad4dfd 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.rescon.TVListAllocator;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -265,4 +264,8 @@ public abstract class AbstractMemTable implements IMemTable
{
}
}
}
+
+ public String getStorageGroupId() {
+ return storageGroupId;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMPrimitiveMemTable.java
b/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMPrimitiveMemTable.java
index d4d583f..45118d2 100644
---
a/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMPrimitiveMemTable.java
+++
b/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMPrimitiveMemTable.java
@@ -1,15 +1,19 @@
package org.apache.iotdb.db.nvm.memtable;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.nvm.space.NVMDataSpace;
import org.apache.iotdb.db.utils.datastructure.NVMTVList;
import org.apache.iotdb.db.rescon.TVListAllocator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
public class NVMPrimitiveMemTable extends AbstractMemTable {
@@ -63,4 +67,21 @@ public class NVMPrimitiveMemTable extends AbstractMemTable {
}
return new ReadOnlyMemChunk(dataType, sorter, props);
}
+
+ public void loadData(Map<String, Map<String, Pair<List<NVMDataSpace>,
List<NVMDataSpace>>>> dataMap) {
+ for (Entry<String, Map<String, Pair<List<NVMDataSpace>,
List<NVMDataSpace>>>> deviceDataEntry : dataMap
+ .entrySet()) {
+ String deviceId = deviceDataEntry.getKey();
+ Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>> dataOfDevice =
deviceDataEntry.getValue();
+ for (Entry<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>>
measurementDataEntry : dataOfDevice
+ .entrySet()) {
+ String measurementId = measurementDataEntry.getKey();
+ Pair<List<NVMDataSpace>, List<NVMDataSpace>> tvListPair =
measurementDataEntry.getValue();
+ TSDataType dataType = tvListPair.right.get(0).getDataType();
+
+ NVMWritableMemChunk memChunk = (NVMWritableMemChunk)
createIfNotExistAndGet(deviceId, measurementId, dataType);
+ memChunk.loadData(tvListPair.left, tvListPair.right);
+ }
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMWritableMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMWritableMemChunk.java
index 5e5092c..6a4f72e 100644
---
a/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMWritableMemChunk.java
+++
b/server/src/main/java/org/apache/iotdb/db/nvm/memtable/NVMWritableMemChunk.java
@@ -3,6 +3,7 @@ package org.apache.iotdb.db.nvm.memtable;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.nvm.space.NVMDataSpace;
import org.apache.iotdb.db.utils.datastructure.NVMTVList;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TsPrimitiveType.TsBinary;
@@ -267,4 +268,9 @@ public class NVMWritableMemChunk implements
IWritableMemChunk {
public void delete(long upperBound) {
list.delete(upperBound);
}
+
+ public void loadData(List<NVMDataSpace> timeSpaceList, List<NVMDataSpace>
valueSpaceList) {
+ // TODO how about abstract
+ list.loadData(timeSpaceList, valueSpaceList);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/DataTypeMemo.java
b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/DataTypeMemo.java
index 97cbe79..3b3e484 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/DataTypeMemo.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/DataTypeMemo.java
@@ -1,24 +1,24 @@
package org.apache.iotdb.db.nvm.metadata;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.db.nvm.space.NVMSpace;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class DataTypeMemo extends NVMSpaceMetadata {
- public DataTypeMemo(ByteBuffer byteBuffer) {
- super(byteBuffer);
+ public DataTypeMemo(NVMSpace space) {
+ super(space);
}
public void set(int index, TSDataType dataType) {
- byteBuffer.putShort(index, dataType.serialize());
+ space.getByteBuffer().putShort(index, dataType.serialize());
}
- public List<TSDataType> getDataTypeList() {
- List<TSDataType> dataTypeList = new ArrayList<>();
- for (int i = 0; i < byteBuffer.capacity(); i++) {
- TSDataType dataType = TSDataType.deserialize(byteBuffer.getShort(i));
+ public List<TSDataType> getDataTypeList(int num) {
+ List<TSDataType> dataTypeList = new ArrayList<>(num);
+ for (int i = 0; i < num; i++) {
+ TSDataType dataType =
TSDataType.deserialize(space.getByteBuffer().getShort(i));
dataTypeList.add(dataType);
}
return dataTypeList;
diff --git
a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/FreeSpaceBitMap.java
b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/FreeSpaceBitMap.java
index 2b7967b..fb72503 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/FreeSpaceBitMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/FreeSpaceBitMap.java
@@ -1,23 +1,23 @@
package org.apache.iotdb.db.nvm.metadata;
-import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
+import org.apache.iotdb.db.nvm.space.NVMSpace;
public class FreeSpaceBitMap extends NVMSpaceMetadata {
- public FreeSpaceBitMap(ByteBuffer byteBuffer) {
- super(byteBuffer);
+ public FreeSpaceBitMap(NVMSpace space) {
+ super(space);
}
public void update(int index, boolean setFree) {
- byteBuffer.put(index, setFree ? (byte) 0 : (byte) 1);
+ space.getByteBuffer().put(index, setFree ? (byte) 0 : (byte) 1);
}
public Set<Integer> getValidSpaceIndexSet() {
Set<Integer> freeSpaceIndexList = new HashSet<>();
- for (int i = 0; i < byteBuffer.capacity(); i++) {
- byte flag = byteBuffer.get(i);
+ for (int i = 0; i < space.getSize(); i++) {
+ byte flag = space.getByteBuffer().get(i);
if (flag == 1) {
freeSpaceIndexList.add(i);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/NVMSpaceMetadata.java
b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/NVMSpaceMetadata.java
index 05c88a7..d5ef0be 100644
---
a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/NVMSpaceMetadata.java
+++
b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/NVMSpaceMetadata.java
@@ -1,12 +1,12 @@
package org.apache.iotdb.db.nvm.metadata;
-import java.nio.ByteBuffer;
+import org.apache.iotdb.db.nvm.space.NVMSpace;
public abstract class NVMSpaceMetadata {
- protected ByteBuffer byteBuffer;
+ protected NVMSpace space;
- public NVMSpaceMetadata(ByteBuffer byteBuffer) {
- this.byteBuffer = byteBuffer;
+ public NVMSpaceMetadata(NVMSpace space) {
+ this.space = space;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/SpaceCount.java
b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/SpaceCount.java
new file mode 100644
index 0000000..b3c7474
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/SpaceCount.java
@@ -0,0 +1,18 @@
+package org.apache.iotdb.db.nvm.metadata;
+
+import org.apache.iotdb.db.nvm.space.NVMSpace;
+
+public class SpaceCount extends NVMSpaceMetadata {
+
+ public SpaceCount(NVMSpace space) {
+ super(space);
+ }
+
+ public void put(int v) {
+ space.getByteBuffer().putInt(0, v);
+ }
+
+ public int get() {
+ return space.getByteBuffer().getInt(0);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TSDataMap.java
b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TSDataMap.java
index d93e73b..2f48fdd 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TSDataMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TSDataMap.java
@@ -1,23 +1,22 @@
package org.apache.iotdb.db.nvm.metadata;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
+import org.apache.iotdb.db.nvm.space.NVMSpace;
import org.apache.iotdb.tsfile.utils.Pair;
public class TSDataMap extends NVMSpaceMetadata {
- public TSDataMap(ByteBuffer byteBuffer) {
- super(byteBuffer);
+ public TSDataMap(NVMSpace space) {
+ super(space);
}
- public void addSpaceToTimeSeries(String sgId, String deviceId, String
measurementId,
- int timeSpaceIndex,
- int valueSpaceIndex) {
-
+ public void addSpaceToTimeSeries(int timeSpaceIndex, int valueSpaceIndex,
String sgId,
+ String deviceId, String measurementId) {
+ // TODO
}
- public Map<String, Map<String, Map<String, List<Pair<Integer, Integer>>>>>
generateTSPathTVPairListMap() {
+ public Map<String, Map<String, Map<String, Pair<List<Integer>,
List<Integer>>>>> generateTSPathTVPairListMap() {
return null;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TimeValueMapper.java
b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TimeValueMapper.java
index 7f3f057..d08b7b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TimeValueMapper.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/metadata/TimeValueMapper.java
@@ -1,15 +1,14 @@
package org.apache.iotdb.db.nvm.metadata;
-import java.nio.ByteBuffer;
+import org.apache.iotdb.db.nvm.space.NVMSpace;
public class TimeValueMapper extends NVMSpaceMetadata {
- public TimeValueMapper(ByteBuffer byteBuffer) {
- super(byteBuffer);
+ public TimeValueMapper(NVMSpace space) {
+ super(space);
}
public void map(int timeSpaceIndex, int valueSpaceIndex) {
- byteBuffer.putInt(timeSpaceIndex);
- byteBuffer.putInt(valueSpaceIndex);
+ space.getByteBuffer().putInt(timeSpaceIndex, valueSpaceIndex);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java
b/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java
index afc33f3..273796d 100644
---
a/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java
+++
b/server/src/main/java/org/apache/iotdb/db/nvm/recover/NVMMemtableRecoverPerformer.java
@@ -1,49 +1,123 @@
package org.apache.iotdb.db.nvm.recover;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileChannel.MapMode;
-import java.util.Iterator;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
-import org.apache.iotdb.db.nvm.metadata.DataTypeMemo;
-import org.apache.iotdb.db.nvm.metadata.FreeSpaceBitMap;
-import org.apache.iotdb.db.nvm.metadata.TSDataMap;
+import java.util.Map.Entry;
+import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.nvm.memtable.NVMPrimitiveMemTable;
+import org.apache.iotdb.db.nvm.space.NVMDataSpace;
+import org.apache.iotdb.db.nvm.space.NVMSpaceManager;
+import org.apache.iotdb.db.nvm.space.NVMSpaceMetadataManager;
import org.apache.iotdb.tsfile.utils.Pair;
public class NVMMemtableRecoverPerformer {
- private FileChannel nvmFileChannel;
- private final MapMode MAP_MODE = MapMode.READ_WRITE;
-
- /**
- * metadata fields
- */
- private FreeSpaceBitMap freeSpaceBitMap;
- private DataTypeMemo dataTypeMemo;
- private TSDataMap tsDataMap;
-
- public Map<String, Map<String, Map<String, List<Pair<Integer, Integer>>>>>
getValidTSPathTVPairListMap() {
- Set<Integer> validSpaceIndexSet = freeSpaceBitMap.getValidSpaceIndexSet();
- Map<String, Map<String, Map<String, List<Pair<Integer, Integer>>>>>
tsTVMap = tsDataMap.generateTSPathTVPairListMap();
- for (Map<String, Map<String, List<Pair<Integer, Integer>>>> dmTVMap :
tsTVMap.values()) {
- for (Map<String, List<Pair<Integer, Integer>>> mTVMap :
dmTVMap.values()) {
- for (List<Pair<Integer, Integer>> tvList : mTVMap.values()) {
- Iterator<Pair<Integer, Integer>> iterator = tvList.iterator();
- while (iterator.hasNext()) {
- Pair<Integer, Integer> tvPair = iterator.next();
- if (!validSpaceIndexSet.contains(tvPair.left) ||
!validSpaceIndexSet.contains(tvPair.right)) {
- iterator.remove();
- }
- }
+ private final static NVMMemtableRecoverPerformer INSTANCE = new
NVMMemtableRecoverPerformer();
+
+ private Map<String, Map<String, Map<String, Pair<List<NVMDataSpace>,
List<NVMDataSpace>>>>> dataMap;
+
+ private NVMMemtableRecoverPerformer() {}
+
+ public void init() throws StartupException {
+ try {
+ dataMap = recoverDataInNVM();
+ } catch (IOException e) {
+ throw new StartupException(e);
+ }
+ }
+
+ public static NVMMemtableRecoverPerformer getInstance() {
+ return INSTANCE;
+ }
+
+ private Map<String, Map<String, Map<String, Pair<List<NVMDataSpace>,
List<NVMDataSpace>>>>> recoverDataInNVM()
+ throws IOException {
+ Map<String, Map<String, Map<String, Pair<List<NVMDataSpace>,
List<NVMDataSpace>>>>> dataMap = new HashMap<>();
+ Map<String, Map<String, Map<String, Pair<List<Integer>, List<Integer>>>>>
indexMap = NVMSpaceMetadataManager.getInstance().getValidSpaceIndexMap();
+ List<NVMDataSpace> dataList =
NVMSpaceManager.getInstance().getAllNVMData();
+
+ for (Entry<String, Map<String, Map<String, Pair<List<Integer>,
List<Integer>>>>> sgIndexEntry : indexMap
+ .entrySet()) {
+ Map<String, Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>>>
deviceDataMap = new HashMap<>(sgIndexEntry.getValue().size());
+ dataMap.put(sgIndexEntry.getKey(), deviceDataMap);
+
+ for (Entry<String, Map<String, Pair<List<Integer>, List<Integer>>>>
deviceIndexEntry : sgIndexEntry
+ .getValue().entrySet()) {
+ Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>>
measurementDataMap = new HashMap<>(deviceIndexEntry.getValue().size());
+ deviceDataMap.put(deviceIndexEntry.getKey(), measurementDataMap);
+
+ for (Entry<String, Pair<List<Integer>, List<Integer>>>
measurementIndexEntry : deviceIndexEntry
+ .getValue().entrySet()) {
+ List<NVMDataSpace> timeList =
convertIndexListToDataList(measurementIndexEntry.getValue().left, dataList);
+ List<NVMDataSpace> valueList =
convertIndexListToDataList(measurementIndexEntry.getValue().right, dataList);
+ Pair<List<NVMDataSpace>, List<NVMDataSpace>> tvDataListPair = new
Pair<>(timeList, valueList);
+ measurementDataMap.put(measurementIndexEntry.getKey(),
tvDataListPair);
}
}
}
- return tsTVMap;
+ return dataMap;
}
- public void reconstructMemtable(String sgId, PrimitiveMemTable memTable) {
+ private List<NVMDataSpace> convertIndexListToDataList(List<Integer>
indexList, List<NVMDataSpace> totalDataList) {
+ List<NVMDataSpace> dataList = new ArrayList<>(indexList.size());
+ for (Integer index : indexList) {
+ dataList.add(totalDataList.get(index));
+ }
+ return dataList;
+ }
+
+ public void reconstructMemtable(NVMPrimitiveMemTable memTable,
TsFileResource tsFileResource) {
+ String sgId = memTable.getStorageGroupId();
+ Map<String, Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>>>
dataOfSG = dataMap.get(sgId);
+ memTable.loadData(dataOfSG);
+
+ Map<String, Long>[] maps = getMinMaxTimeMapFromData(dataOfSG);
+ Map<String, Long> minTimeMap = maps[0];
+ Map<String, Long> maxTimeMap = maps[1];
+ minTimeMap.forEach((k, v) -> tsFileResource.updateStartTime(k, v));
+ maxTimeMap.forEach((k, v) -> tsFileResource.updateEndTime(k, v));
+ }
+
+ private Map<String, Long>[] getMinMaxTimeMapFromData(Map<String, Map<String,
Pair<List<NVMDataSpace>, List<NVMDataSpace>>>> sgDataMap) {
+ Map<String, Long> minTimeMap = new HashMap<>();
+ Map<String, Long> maxTimeMap = new HashMap<>();
+ for (Entry<String, Map<String, Pair<List<NVMDataSpace>,
List<NVMDataSpace>>>> deviceDataMapEntry : sgDataMap
+ .entrySet()) {
+ String deviceId = deviceDataMapEntry.getKey();
+ Map<String, Pair<List<NVMDataSpace>, List<NVMDataSpace>>>
measurementDataMap = deviceDataMapEntry.getValue();
+ long minTime = Long.MAX_VALUE;
+ long maxTime = Long.MIN_VALUE;
+ for (Pair<List<NVMDataSpace>, List<NVMDataSpace>> tvListPair :
measurementDataMap.values()) {
+ List<NVMDataSpace> timeSpaceList = tvListPair.left;
+ for (int i = 0; i < timeSpaceList.size(); i++) {
+ NVMDataSpace timeSpace = timeSpaceList.get(i);
+ int unitNum = timeSpace.getUnitNum();
+ if (i == timeSpaceList.size() - 1) {
+ unitNum = timeSpace.getValidUnitNum();
+ }
+ for (int j = 0; j < unitNum; j++) {
+ long time = (long) timeSpace.get(j);
+ minTime = Math.min(minTime, time);
+ maxTime = Math.max(maxTime, time);
+ }
+ }
+ }
+ if (minTime != Long.MAX_VALUE) {
+ minTimeMap.put(deviceId, minTime);
+ }
+ if (maxTime != Long.MIN_VALUE) {
+ maxTimeMap.put(deviceId, maxTime);
+ }
+ }
+ Map<String, Long>[] res = new Map[2];
+ res[0] = minTimeMap;
+ res[1] = maxTimeMap;
+ return res;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java
b/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java
index 671822a..8bf3e2e 100644
---
a/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java
+++
b/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java
@@ -5,6 +5,7 @@ import java.util.EnumMap;
import org.apache.iotdb.db.nvm.PerfMonitor;
import org.apache.iotdb.db.nvm.space.NVMDataSpace;
import org.apache.iotdb.db.nvm.space.NVMSpaceManager;
+import org.apache.iotdb.db.nvm.space.NVMSpaceMetadataManager;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class NVMPrimitiveArrayPool {
@@ -42,7 +43,6 @@ public class NVMPrimitiveArrayPool {
if (nvmSpace == null) {
nvmSpace = NVMSpaceManager.getInstance().allocateDataSpace(size *
ARRAY_SIZE, dataType);
}
- NVMSpaceManager.getInstance().registerNVMDataSpace(nvmSpace);
PerfMonitor.add("NVM.getDataList", System.currentTimeMillis() - time);
return nvmSpace;
@@ -52,7 +52,7 @@ public class NVMPrimitiveArrayPool {
// TODO freeslotmap?
primitiveArraysMap.get(dataType).add(nvmSpace);
- NVMSpaceManager.getInstance().unregisterNVMDataSpace(nvmSpace);
+ NVMSpaceMetadataManager.getInstance().unregisterSpace(nvmSpace);
}
// /**
diff --git
a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java
b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java
index 2ed0229..ff203d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMDataSpace.java
@@ -5,32 +5,49 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class NVMDataSpace extends NVMSpace {
+ private final long INVALID_VALUE = 0;
+
private int index;
private TSDataType dataType;
+ private int unitSize;
NVMDataSpace(long offset, long size, ByteBuffer byteBuffer, int index,
TSDataType dataType) {
super(offset, size, byteBuffer);
this.index = index;
this.dataType = dataType;
+ unitSize = NVMSpaceManager.getPrimitiveTypeByteSize(dataType);
}
- @Override
- public NVMDataSpace clone() {
- return new NVMDataSpace(offset, size, cloneByteBuffer(), index, dataType);
+ public int getUnitNum() {
+ return (int) (size / unitSize);
+ }
+
+ public void refreshData() {
+ // TODO only for Long
+ for (int i = 0; i < size /
NVMSpaceManager.getPrimitiveTypeByteSize(TSDataType.INT64); i++) {
+ byteBuffer.putLong(i, INVALID_VALUE);
+ }
}
- public ByteBuffer cloneByteBuffer() {
- ByteBuffer clone = ByteBuffer.allocate(byteBuffer.capacity());
- byteBuffer.rewind();
- clone.put(byteBuffer);
- byteBuffer.rewind();
- clone.flip();
- return clone;
+ /**
+ * for Long only
+ * @return
+ */
+ public int getValidUnitNum() {
+ int count = 0;
+ while (true) {
+ long v = byteBuffer.getLong(count);
+ if (v == INVALID_VALUE) {
+ break;
+ }
+
+ count++;
+ }
+ return count;
}
public Object get(int index) {
- int objectSize = NVMSpaceManager.getPrimitiveTypeByteSize(dataType);
- index *= objectSize;
+ index *= unitSize;
Object object = null;
switch (dataType) {
case BOOLEAN:
@@ -56,8 +73,7 @@ public class NVMDataSpace extends NVMSpace {
}
public void set(int index, Object object) {
- int objectSize = NVMSpaceManager.getPrimitiveTypeByteSize(dataType);
- index *= objectSize;
+ index *= unitSize;
switch (dataType) {
case BOOLEAN:
byteBuffer.put(index, (byte) object);
diff --git
a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
index 5ae2dc0..5a5b7e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
@@ -1,16 +1,17 @@
package org.apache.iotdb.db.nvm.space;
+import static org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool.ARRAY_SIZE;
+
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.nvm.metadata.DataTypeMemo;
-import org.apache.iotdb.db.nvm.metadata.FreeSpaceBitMap;
-import org.apache.iotdb.db.nvm.metadata.TSDataMap;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -22,21 +23,7 @@ public class NVMSpaceManager {
private static final Logger logger =
LoggerFactory.getLogger(NVMSpaceManager.class);
private static final String NVM_FILE_NAME = "nvmFile";
- private static final int NVMSPACE_NUM_MAX = 1000000;
-
- private static final long BITMAP_FIELD_OFFSET = 0L;
- private static final long BITMAP_FIELD_BYTE_SIZE = Byte.BYTES *
NVMSPACE_NUM_MAX;
-
- private static final long DATATYPE_FIELD_OFFSET = BITMAP_FIELD_OFFSET +
BITMAP_FIELD_BYTE_SIZE;
- private static final long DATATYPE_FIELD_BYTE_SIZE = Short.BYTES *
NVMSPACE_NUM_MAX;
-
- private static final long TSID_FIELD_OFFSET = DATATYPE_FIELD_OFFSET +
DATATYPE_FIELD_BYTE_SIZE;
- private static final long TSID_FIELD_BYTE_SIZE =
getPrimitiveTypeByteSize(TSDataType.INT64) * NVMSPACE_NUM_MAX;
-
- private static final long TVMAP_FIELD_OFFSET = TSID_FIELD_OFFSET +
TSID_FIELD_BYTE_SIZE;
- private static final long TVMAP_FIELD_BYTE_SIZE =
getPrimitiveTypeByteSize(TSDataType.INT32) * 2 * NVMSPACE_NUM_MAX;
-
- private static final long DATA_FILED_OFFSET = TVMAP_FIELD_OFFSET +
TVMAP_FIELD_BYTE_SIZE;
+ public static final int NVMSPACE_NUM_MAX = 1000000;
private final static NVMSpaceManager INSTANCE = new NVMSpaceManager();
@@ -45,18 +32,10 @@ public class NVMSpaceManager {
private final MapMode MAP_MODE = MapMode.READ_WRITE;
private long nvmSize;
- /**
- * metadata fields
- */
- private FreeSpaceBitMap freeSpaceBitMap;
- private DataTypeMemo dataTypeMemo;
- private TSDataMap tsDataMap;
-
- /**
- * data field
- */
private AtomicInteger curDataSpaceIndex = new AtomicInteger(0);
- private long curDataOffset = DATA_FILED_OFFSET;
+ private long curOffset = 0L;
+
+ private NVMSpaceMetadataManager metadataManager =
NVMSpaceMetadataManager.getInstance();
private NVMSpaceManager() {}
@@ -69,19 +48,13 @@ public class NVMSpaceManager {
nvmSize = nvmDirFile.getUsableSpace();
nvmFileChannel = new RandomAccessFile(nvmFilePath, "rw").getChannel();
- initMetadataFields();
+ metadataManager.init();
} catch (IOException e) {
logger.error("Fail to open NVM space at {}.", nvmFilePath, e);
throw new StartupException(e);
}
}
- private void initMetadataFields() throws IOException {
- freeSpaceBitMap = new FreeSpaceBitMap(nvmFileChannel.map(MAP_MODE,
BITMAP_FIELD_OFFSET, BITMAP_FIELD_BYTE_SIZE));
- dataTypeMemo = new DataTypeMemo(nvmFileChannel.map(MAP_MODE,
DATATYPE_FIELD_OFFSET, DATATYPE_FIELD_BYTE_SIZE));
- tsDataMap = new TSDataMap(nvmFileChannel.map(MAP_MODE, TSID_FIELD_OFFSET,
TSID_FIELD_BYTE_SIZE));
- }
-
public void close() throws IOException {
nvmFileChannel.close();
}
@@ -113,40 +86,56 @@ public class NVMSpaceManager {
return size;
}
- public synchronized NVMSpace allocateSpace(long offset, long size) throws
IOException {
- return new NVMSpace(offset, size, nvmFileChannel.map(MAP_MODE, offset,
size));
+ public synchronized NVMSpace allocateSpace(long size) throws IOException {
+ NVMSpace nvmSpace = new NVMSpace(curOffset, size,
nvmFileChannel.map(MAP_MODE, curOffset, size));
+ curOffset += size;
+ return nvmSpace;
}
public synchronized NVMDataSpace allocateDataSpace(long size, TSDataType
dataType) {
- // TODO check if full
+ checkIsFull();
try {
- logger.trace("Try to allocate {} nvm space at {}.", size, curDataOffset);
+ logger.trace("Try to allocate {} nvm space at {}.", size, curOffset);
+ int index = curDataSpaceIndex.getAndIncrement();
NVMDataSpace nvmSpace = new NVMDataSpace(
- curDataOffset, size, nvmFileChannel.map(MAP_MODE, curDataOffset,
size), curDataSpaceIndex
- .getAndIncrement(), dataType);
- curDataOffset += size;
+ curOffset, size, nvmFileChannel.map(MAP_MODE, curOffset, size),
index, dataType);
+ nvmSpace.refreshData();
+ metadataManager.updateCount(index);
+ curOffset += size;
return nvmSpace;
} catch (IOException e) {
// TODO deal with error
- logger.error("Fail to allocate {} nvm space at {}.", size,
curDataOffset);
+ logger.error("Fail to allocate {} nvm space at {}.", size, curOffset);
e.printStackTrace();
return null;
}
}
- public void registerNVMDataSpace(NVMDataSpace nvmDataSpace) {
- int spaceIndex = nvmDataSpace.getIndex();
- freeSpaceBitMap.update(spaceIndex, false);
- dataTypeMemo.set(spaceIndex, nvmDataSpace.getDataType());
+ private void checkIsFull() {
+ // TODO
}
- public void unregisterNVMDataSpace(NVMDataSpace nvmDataSpace) {
- freeSpaceBitMap.update(nvmDataSpace.getIndex(), true);
+ public List<NVMDataSpace> getAllNVMData() throws IOException {
+ int spaceCount = metadataManager.getCount();
+ List<NVMDataSpace> nvmDataList = new ArrayList<>(spaceCount);
+ List<TSDataType> dataTypeList =
metadataManager.getDataTypeList(spaceCount);
+ long curOffset = 0;
+ for (int i = 0; i < spaceCount; i++) {
+ TSDataType dataType = dataTypeList.get(i);
+ int spaceSize = NVMSpaceManager.getPrimitiveTypeByteSize(dataType) *
ARRAY_SIZE;
+ NVMDataSpace nvmDataSpace = recoverData(curOffset, spaceSize, i,
dataType);
+ nvmDataList.add(nvmDataSpace);
+
+ curOffset += spaceSize;
+ }
+ return nvmDataList;
}
- public void addSpaceToTimeSeries(String sgId, String deviceId, String
measurementId, int timeSpaceIndex, int valueSpaceIndex) {
- tsDataMap.addSpaceToTimeSeries(sgId, deviceId, measurementId,
timeSpaceIndex, valueSpaceIndex);
+ private synchronized NVMDataSpace recoverData(long offset, long size, int
index, TSDataType dataType) throws IOException {
+ NVMDataSpace nvmSpace = new NVMDataSpace(curOffset, size,
nvmFileChannel.map(MAP_MODE, curOffset, size), index, dataType);
+ curOffset += size;
+ return nvmSpace;
}
public static NVMSpaceManager getInstance() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceMetadataManager.java
b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceMetadataManager.java
new file mode 100644
index 0000000..8863408
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceMetadataManager.java
@@ -0,0 +1,108 @@
+package org.apache.iotdb.db.nvm.space;
+
+import static org.apache.iotdb.db.nvm.space.NVMSpaceManager.NVMSPACE_NUM_MAX;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.iotdb.db.nvm.metadata.DataTypeMemo;
+import org.apache.iotdb.db.nvm.metadata.FreeSpaceBitMap;
+import org.apache.iotdb.db.nvm.metadata.SpaceCount;
+import org.apache.iotdb.db.nvm.metadata.TSDataMap;
+import org.apache.iotdb.db.nvm.metadata.TimeValueMapper;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class NVMSpaceMetadataManager {
+
+ private static final long SPACE_COUNT_FIELD_BYTE_SIZE = Integer.BYTES;
+ private static final long BITMAP_FIELD_BYTE_SIZE = Byte.BYTES *
NVMSPACE_NUM_MAX;
+ private static final long DATATYPE_FIELD_BYTE_SIZE = Short.BYTES *
NVMSPACE_NUM_MAX;
+ private static final long TVMAP_FIELD_BYTE_SIZE =
NVMSpaceManager.getPrimitiveTypeByteSize(TSDataType.INT32) * NVMSPACE_NUM_MAX;
+ private static final long TIMESERIES_FIELD_BYTE_SIZE = 0;
+
+ private final static NVMSpaceMetadataManager INSTANCE = new
NVMSpaceMetadataManager();
+
+ private SpaceCount spaceCount;
+ private FreeSpaceBitMap freeSpaceBitMap;
+ private DataTypeMemo dataTypeMemo;
+ private TimeValueMapper timeValueMapper;
+ private TSDataMap tsDataMap;
+
+ private NVMSpaceManager spaceManager = NVMSpaceManager.getInstance();
+
+ private NVMSpaceMetadataManager() {}
+
+ public void init() throws IOException {
+ spaceCount = new
SpaceCount(spaceManager.allocateSpace(SPACE_COUNT_FIELD_BYTE_SIZE));
+ freeSpaceBitMap = new
FreeSpaceBitMap(spaceManager.allocateSpace(BITMAP_FIELD_BYTE_SIZE));
+ dataTypeMemo = new
DataTypeMemo(spaceManager.allocateSpace(DATATYPE_FIELD_BYTE_SIZE));
+ timeValueMapper = new
TimeValueMapper(spaceManager.allocateSpace(TVMAP_FIELD_BYTE_SIZE));
+ tsDataMap = new
TSDataMap(spaceManager.allocateSpace(TIMESERIES_FIELD_BYTE_SIZE));
+ }
+
+ public static NVMSpaceMetadataManager getInstance() {
+ return INSTANCE;
+ }
+
+ public void updateCount(int v) {
+ spaceCount.put(v);
+ }
+
+ public int getCount() {
+ return spaceCount.get();
+ }
+
+ public void registerTVSpace(NVMDataSpace timeSpace, NVMDataSpace valueSpace,
String sgId, String deviceId, String measurementId) {
+ int timeSpaceIndex = timeSpace.getIndex();
+ int valueSpaceIndex = valueSpace.getIndex();
+ freeSpaceBitMap.update(timeSpaceIndex, false);
+ freeSpaceBitMap.update(valueSpaceIndex, false);
+ dataTypeMemo.set(timeSpaceIndex, timeSpace.getDataType());
+ dataTypeMemo.set(valueSpaceIndex, valueSpace.getDataType());
+
+ timeValueMapper.map(timeSpaceIndex, valueSpaceIndex);
+ tsDataMap.addSpaceToTimeSeries(timeSpaceIndex, valueSpaceIndex, sgId,
deviceId, measurementId);
+ }
+
+ public void unregisterSpace(NVMDataSpace space) {
+ freeSpaceBitMap.update(space.getIndex(), true);
+ }
+
+ public Map<String, Map<String, Map<String, Pair<List<Integer>,
List<Integer>>>>> getValidSpaceIndexMap() {
+ Set<Integer> validSpaceIndexSet = freeSpaceBitMap.getValidSpaceIndexSet();
+ Map<String, Map<String, Map<String, Pair<List<Integer>, List<Integer>>>>>
tsTVMap = tsDataMap.generateTSPathTVPairListMap();
+ for (Map<String, Map<String, Pair<List<Integer>, List<Integer>>>> dmTVMap
: tsTVMap.values()) {
+ for (Map<String, Pair<List<Integer>, List<Integer>>> mTVMap :
dmTVMap.values()) {
+ for (Pair<List<Integer>, List<Integer>> tvIndexListPair :
mTVMap.values()) {
+ List<Integer> timeIndexList = tvIndexListPair.left;
+ List<Integer> valueIndexList = tvIndexListPair.right;
+
+ Set<Integer> toBeRemovedIndexSet = new TreeSet<>((o1, o2) -> o2 -
o1);
+ for (int i = 0; i < timeIndexList.size(); i++) {
+ if (!validSpaceIndexSet.contains(timeIndexList.get(i))) {
+ toBeRemovedIndexSet.add(i);
+ }
+ }
+ for (int i = 0; i < timeIndexList.size(); i++) {
+ if (!validSpaceIndexSet.contains(timeIndexList.get(i))) {
+ toBeRemovedIndexSet.add(i);
+ }
+ }
+
+ for (Integer index : toBeRemovedIndexSet) {
+ timeIndexList.remove(index);
+ valueIndexList.remove(index);
+ }
+ }
+ }
+ }
+ return tsTVMap;
+ }
+
+ public List<TSDataType> getDataTypeList(int count) {
+ return dataTypeMemo.getDataTypeList(count);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 0095c63..3fc1cd8 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.nvm.recover.NVMMemtableRecoverPerformer;
import org.apache.iotdb.db.nvm.space.NVMSpaceManager;
import org.apache.iotdb.db.rescon.TVListAllocator;
import org.apache.iotdb.db.sync.receiver.SyncServerManager;
@@ -82,6 +83,7 @@ public class IoTDB implements IoTDBMBean {
Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook());
setUncaughtExceptionHandler();
+ NVMMemtableRecoverPerformer.getInstance().init();
initMManager();
registerManager.register(StorageEngine.getInstance());
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java
index 9543f6b..d84a87a 100644
---
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java
+++
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java
@@ -6,7 +6,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool;
import org.apache.iotdb.db.nvm.space.NVMDataSpace;
-import org.apache.iotdb.db.nvm.space.NVMSpaceManager;
+import org.apache.iotdb.db.nvm.space.NVMSpaceMetadataManager;
import org.apache.iotdb.db.rescon.PrimitiveArrayPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -141,7 +141,7 @@ public abstract class NVMTVList extends AbstractTVList {
NVMDataSpace valueSpace = expandValues();
NVMDataSpace timeSpace =
NVMPrimitiveArrayPool.getInstance().getPrimitiveDataListByType(TSDataType.INT64);
timestamps.add(timeSpace);
- NVMSpaceManager.getInstance().addSpaceToTimeSeries(sgId, deviceId,
measurementId, timestamps.get(timestamps.size() - 1).getIndex(),
values.get(values.size() - 1).getIndex());
+ NVMSpaceMetadataManager.getInstance().registerTVSpace(timeSpace,
valueSpace, sgId, deviceId, measurementId);
}
}
@@ -169,4 +169,40 @@ public abstract class NVMTVList extends AbstractTVList {
}
return null;
}
+
+ public void loadData(List<NVMDataSpace> timeSpaceList, List<NVMDataSpace>
valueSpaceList) {
+ this.timestamps.addAll(timeSpaceList);
+ this.values.addAll(valueSpaceList);
+
+ refreshMetadata(timeSpaceList);
+ }
+
+ private void refreshMetadata(List<NVMDataSpace> spaceList) {
+ if (spaceList.isEmpty()) {
+ return;
+ }
+
+ NVMDataSpace lastSpace = spaceList.get(spaceList.size() - 1);
+ int lastSpaceUnitSize = lastSpace.getValidUnitNum();
+
+ // size
+ for (int i = 0; i < spaceList.size() - 1; i++) {
+ size += spaceList.get(i).getUnitNum();
+ }
+ size += lastSpaceUnitSize;
+
+ // minTime
+ for (int i = 0; i < spaceList.size() - 1; i++) {
+ NVMDataSpace space = spaceList.get(i);
+ for (int j = 0; j < space.getUnitNum(); j++) {
+ minTime = Math.min(minTime, (Long) space.get(j));
+ }
+ }
+ for (int i = 0; i < lastSpaceUnitSize; i++) {
+ minTime = Math.min(minTime, (Long) lastSpace.get(i));
+ }
+
+ // sorted
+ sorted = false;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 46751b9..9e8d8ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -174,6 +174,7 @@ public class LogReplayer {
insertPlan.setDataTypes(dataTypes);
try {
recoverMemTable.insert(insertPlan);
+ // TODO how about NVM
} catch (Exception e) {
logger.error(
"occurs exception when replaying the record {} at timestamp {}:
{}.(Will ignore the record)",
diff --git
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 6f52c2b..8d86590 100644
---
a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++
b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -26,13 +26,17 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
+import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
import
org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
+import org.apache.iotdb.db.nvm.memtable.NVMPrimitiveMemTable;
+import org.apache.iotdb.db.nvm.recover.NVMMemtableRecoverPerformer;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -140,15 +144,20 @@ public class TsFileRecoverPerformer {
recoverResourceFromWriter(restorableTsFileIOWriter);
}
- // redo logs
- redoLogs(restorableTsFileIOWriter);
+ // recover data in memory
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableNVM()) {
+ reloadNVMData(restorableTsFileIOWriter);
+ } else {
+ redoLogs(restorableTsFileIOWriter);
- // clean logs
- try {
- MultiFileLogNodeManager.getInstance()
- .deleteNode(logNodePrefix +
SystemFileFactory.INSTANCE.getFile(insertFilePath).getName());
- } catch (IOException e) {
- throw new StorageGroupProcessorException(e);
+ // clean logs
+ try {
+ MultiFileLogNodeManager.getInstance()
+ .deleteNode(
+ logNodePrefix +
SystemFileFactory.INSTANCE.getFile(insertFilePath).getName());
+ } catch (IOException e) {
+ throw new StorageGroupProcessorException(e);
+ }
}
}
@@ -220,7 +229,21 @@ public class TsFileRecoverPerformer {
}
}
- private void recoverNVMData() {
-
+ private void reloadNVMData(RestorableTsFileIOWriter restorableTsFileIOWriter)
+ throws StorageGroupProcessorException {
+ NVMPrimitiveMemTable recoverMemTable = new
NVMPrimitiveMemTable(storageGroupId);
+
NVMMemtableRecoverPerformer.getInstance().reconstructMemtable(recoverMemTable,
tsFileResource);
+ try {
+ if (!recoverMemTable.isEmpty()) {
+ MemTableFlushTask tableFlushTask = new
MemTableFlushTask(recoverMemTable, schema,
+ restorableTsFileIOWriter,
tsFileResource.getFile().getParentFile().getName());
+ tableFlushTask.syncFlushMemTable();
+ }
+ // close file
+ restorableTsFileIOWriter.endFile(schema);
+ tsFileResource.serialize();
+ } catch (IOException | InterruptedException | ExecutionException e) {
+ throw new StorageGroupProcessorException(e);
+ }
}
}