This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e7b0ca5793b Refactor last flush time management (#11946)
e7b0ca5793b is described below
commit e7b0ca5793b09f0f2fe5696713c6f6b8fd64b5f3
Author: Haonan <[email protected]>
AuthorDate: Sun Jan 28 09:25:29 2024 +0800
Refactor last flush time management (#11946)
---
.../db/storageengine/dataregion/DataRegion.java | 17 ++--
.../dataregion/DeviceLastFlushTime.java | 50 ++++++++++++
.../dataregion/HashLastFlushTimeMap.java | 92 ++++++++--------------
.../storageengine/dataregion/ILastFlushTime.java | 29 +++++++
.../dataregion/ILastFlushTimeMap.java | 12 ++-
.../dataregion/PartitionLastFlushTime.java | 44 +++++++++++
.../dataregion/tsfile/TsFileManager.java | 36 ---------
.../rescon/memory/TimePartitionInfo.java | 4 +
.../rescon/memory/TimePartitionManager.java | 25 ++----
.../dataregion/LastFlushTimeMapTest.java | 84 +++++---------------
.../rescon/memory/TimePartitionManagerTest.java | 18 +++++
11 files changed, 218 insertions(+), 193 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 2a4575bf3e7..c1a4ee9a7b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -310,7 +310,7 @@ public class DataRegion implements IDataRegionForQuery {
logger.error("create database system Directory {} failed",
storageGroupSysDir.getPath());
}
- lastFlushTimeMap = new HashLastFlushTimeMap(tsFileManager);
+ lastFlushTimeMap = new HashLastFlushTimeMap();
// recover tsfiles unless consensus protocol is ratis and storage
storageengine is not ready
if (config.isClusterMode()
@@ -2732,7 +2732,7 @@ public class DataRegion implements IDataRegionForQuery {
* Update latest time in latestTimeForEachDevice and
* partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load
external tsfile module.
*/
- private void updateLastFlushTime(TsFileResource newTsFileResource) {
+ protected void updateLastFlushTime(TsFileResource newTsFileResource) {
for (String device : newTsFileResource.getDevices()) {
long endTime = newTsFileResource.getEndTime(device);
long timePartitionId = TimePartitionUtils.getTimePartitionId(endTime);
@@ -3170,7 +3170,8 @@ public class DataRegion implements IDataRegionForQuery {
// init map
timePartitionIds[i] =
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
- if
(!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionIds[i])) {
+ if (config.isEnableSeparateData()
+ &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionIds[i])) {
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
@@ -3398,13 +3399,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- public void releaseFlushTimeMap(long timePartitionId) {
- writeLock("releaseFlushTimeMap");
- try {
- lastFlushTimeMap.removePartition(timePartitionId);
- } finally {
- writeUnlock();
- }
+ /* Be careful, the thread that calls this method may not hold the write
lock!!*/
+ public void degradeFlushTimeMap(long timePartitionId) {
+ lastFlushTimeMap.degradeLastFlushTime(timePartitionId);
}
public long getMemCost() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
new file mode 100644
index 00000000000..4ead2927559
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DeviceLastFlushTime implements ILastFlushTime {
+
+ Map<String, Long> deviceLastFlushTimeMap = new HashMap<>();
+
+ @Override
+ public long getLastFlushTime(String deviceId) {
+ if (deviceLastFlushTimeMap.containsKey(deviceId)) {
+ return deviceLastFlushTimeMap.get(deviceId);
+ }
+ return Long.MIN_VALUE;
+ }
+
+ @Override
+ public void updateLastFlushTime(String deviceId, long time) {
+ deviceLastFlushTimeMap.merge(deviceId, time, Math::max);
+ }
+
+ @Override
+ public ILastFlushTime degradeLastFlushTime() {
+ long maxTime = Long.MIN_VALUE;
+ for (long time : deviceLastFlushTimeMap.values()) {
+ maxTime = Math.max(maxTime, time);
+ }
+ return new PartitionLastFlushTime(maxTime);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
index d8215aff763..4e7da930ea8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
@@ -19,13 +19,11 @@
package org.apache.iotdb.db.storageengine.dataregion;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class HashLastFlushTimeMap implements ILastFlushTimeMap {
@@ -57,8 +55,7 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
*
* <p>It is used to separate sequence and unsequence data.
*/
- private final Map<Long, Map<String, Long>>
partitionLatestFlushedTimeForEachDevice =
- new HashMap<>();
+ private final Map<Long, ILastFlushTime> partitionLatestFlushedTime = new
ConcurrentHashMap<>();
/**
* global mapping of device -> largest timestamp of the latest memtable to *
be submitted to
@@ -68,52 +65,39 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
*
* <p>It is used to update last cache.
*/
- private final Map<String, Long> globalLatestFlushedTimeForEachDevice = new
HashMap<>();
-
- /** used for recovering flush time from tsfile resource */
- TsFileManager tsFileManager;
+ private final Map<String, Long> globalLatestFlushedTimeForEachDevice = new
ConcurrentHashMap<>();
/** record memory cost of map for each partitionId */
- private final Map<Long, Long> memCostForEachPartition = new HashMap<>();
-
- public HashLastFlushTimeMap(TsFileManager tsFileManager) {
- this.tsFileManager = tsFileManager;
- }
+ private final Map<Long, Long> memCostForEachPartition = new
ConcurrentHashMap<>();
+ // For load
@Override
- public void updateOneDeviceFlushedTime(long timePartitionId, String path,
long time) {
- Map<String, Long> flushTimeMapForPartition =
- partitionLatestFlushedTimeForEachDevice.computeIfAbsent(
- timePartitionId, id -> new HashMap<>());
-
- flushTimeMapForPartition.compute(
- path,
- (k, v) -> {
- if (v == null) {
- v = recoverFlushTime(timePartitionId, path);
- }
- if (v == Long.MIN_VALUE) {
- long memCost = HASHMAP_NODE_BASIC_SIZE + 2L * path.length();
- memCostForEachPartition.compute(
- timePartitionId, (k1, v1) -> v1 == null ? memCost : v1 +
memCost);
- return time;
- }
- return Math.max(v, time);
- });
+ public void updateOneDeviceFlushedTime(long timePartitionId, String
deviceId, long time) {
+ ILastFlushTime flushTimeMapForPartition =
+ partitionLatestFlushedTime.computeIfAbsent(
+ timePartitionId, id -> new DeviceLastFlushTime());
+ long lastFlushTime = flushTimeMapForPartition.getLastFlushTime(deviceId);
+ if (lastFlushTime == Long.MIN_VALUE) {
+ long memCost = HASHMAP_NODE_BASIC_SIZE + 2L * deviceId.length();
+ memCostForEachPartition.compute(
+ timePartitionId, (k1, v1) -> v1 == null ? memCost : v1 + memCost);
+ }
+ flushTimeMapForPartition.updateLastFlushTime(deviceId, time);
}
+ // For recover
@Override
public void updateMultiDeviceFlushedTime(long timePartitionId, Map<String,
Long> flushedTimeMap) {
- Map<String, Long> flushTimeMapForPartition =
- partitionLatestFlushedTimeForEachDevice.computeIfAbsent(
- timePartitionId, id -> new HashMap<>());
+ ILastFlushTime flushTimeMapForPartition =
+ partitionLatestFlushedTime.computeIfAbsent(
+ timePartitionId, id -> new DeviceLastFlushTime());
long memIncr = 0;
for (Map.Entry<String, Long> entry : flushedTimeMap.entrySet()) {
- if (!flushTimeMapForPartition.containsKey(entry.getKey())) {
+ if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) ==
Long.MIN_VALUE) {
memIncr += HASHMAP_NODE_BASIC_SIZE + 2L * entry.getKey().length();
}
- flushTimeMapForPartition.merge(entry.getKey(), entry.getValue(),
Math::max);
+ flushTimeMapForPartition.updateLastFlushTime(entry.getKey(),
entry.getValue());
}
long finalMemIncr = memIncr;
memCostForEachPartition.compute(
@@ -135,19 +119,20 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
@Override
public boolean checkAndCreateFlushedTimePartition(long timePartitionId) {
- if (!partitionLatestFlushedTimeForEachDevice.containsKey(timePartitionId))
{
- partitionLatestFlushedTimeForEachDevice.put(timePartitionId, new
HashMap<>());
+ if (!partitionLatestFlushedTime.containsKey(timePartitionId)) {
+ partitionLatestFlushedTime.put(timePartitionId, new
DeviceLastFlushTime());
return false;
}
return true;
}
+ // For insert
@Override
public void updateLatestFlushTime(long partitionId, Map<String, Long>
updateMap) {
for (Map.Entry<String, Long> entry : updateMap.entrySet()) {
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(partitionId, id -> new HashMap<>())
- .merge(entry.getKey(), entry.getValue(), Math::max);
+ partitionLatestFlushedTime
+ .computeIfAbsent(partitionId, id -> new DeviceLastFlushTime())
+ .updateLastFlushTime(entry.getKey(), entry.getValue());
if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(),
Long.MIN_VALUE)
< entry.getValue()) {
globalLatestFlushedTimeForEachDevice.put(entry.getKey(),
entry.getValue());
@@ -156,10 +141,8 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
}
@Override
- public long getFlushedTime(long timePartitionId, String path) {
- return partitionLatestFlushedTimeForEachDevice
- .get(timePartitionId)
- .computeIfAbsent(path, k -> recoverFlushTime(timePartitionId, path));
+ public long getFlushedTime(long timePartitionId, String deviceId) {
+ return
partitionLatestFlushedTime.get(timePartitionId).getLastFlushTime(deviceId);
}
@Override
@@ -169,7 +152,7 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
@Override
public void clearFlushedTime() {
- partitionLatestFlushedTimeForEachDevice.clear();
+ partitionLatestFlushedTime.clear();
}
@Override
@@ -178,15 +161,10 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
}
@Override
- public void removePartition(long partitionId) {
- partitionLatestFlushedTimeForEachDevice.remove(partitionId);
- memCostForEachPartition.remove(partitionId);
- }
-
- private long recoverFlushTime(long partitionId, String devicePath) {
- long memCost = HASHMAP_NODE_BASIC_SIZE + 2L * devicePath.length();
- memCostForEachPartition.compute(partitionId, (k, v) -> v == null ? memCost
: v + memCost);
- return tsFileManager.recoverFlushTimeFromTsFileResource(partitionId,
devicePath);
+ public void degradeLastFlushTime(long partitionId) {
+ partitionLatestFlushedTime.computeIfPresent(
+ partitionId, (id, lastFlushTime) ->
lastFlushTime.degradeLastFlushTime());
+ memCostForEachPartition.put(partitionId, (long) Long.BYTES);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java
new file mode 100644
index 00000000000..7d24f3bf457
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion;
+
+public interface ILastFlushTime {
+
+ long getLastFlushTime(String device);
+
+ void updateLastFlushTime(String device, long time);
+
+ ILastFlushTime degradeLastFlushTime();
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
index 97f114e04a6..1f10c560219 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
@@ -25,8 +25,8 @@ import java.util.Map;
public interface ILastFlushTimeMap {
// region update
- /** Update partitionLatestFlushedTimeForEachDevice. */
- void updateOneDeviceFlushedTime(long timePartitionId, String path, long
time);
+ /** Update partitionLatestFlushedTime. */
+ void updateOneDeviceFlushedTime(long timePartitionId, String deviceId, long
time);
void updateMultiDeviceFlushedTime(long timePartitionId, Map<String, Long>
flushedTimeMap);
@@ -35,9 +35,7 @@ public interface ILastFlushTimeMap {
void updateMultiDeviceGlobalFlushedTime(Map<String, Long>
globalFlushedTimeMap);
- /**
- * Update both partitionLatestFlushedTimeForEachDevice and
globalLatestFlushedTimeForEachDevice.
- */
+ /** Update both partitionLatestFlushedTime and
globalLatestFlushedTimeForEachDevice. */
void updateLatestFlushTime(long partitionId, Map<String, Long> updateMap);
// endregion
@@ -47,7 +45,7 @@ public interface ILastFlushTimeMap {
// endregion
// region read
- long getFlushedTime(long timePartitionId, String path);
+ long getFlushedTime(long timePartitionId, String deviceId);
long getGlobalFlushedTime(String path);
// endregion
@@ -58,7 +56,7 @@ public interface ILastFlushTimeMap {
void clearGlobalFlushedTime();
// endregion
- void removePartition(long partitionId);
+ void degradeLastFlushTime(long partitionId);
long getMemSize(long partitionId);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java
new file mode 100644
index 00000000000..59da5f7c509
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion;
+
+public class PartitionLastFlushTime implements ILastFlushTime {
+
+ private long partitionLastFlushTime;
+
+ public PartitionLastFlushTime(long initPartitionLastFlushTime) {
+ this.partitionLastFlushTime = initPartitionLastFlushTime;
+ }
+
+ @Override
+ public long getLastFlushTime(String device) {
+ return partitionLastFlushTime;
+ }
+
+ @Override
+ public void updateLastFlushTime(String device, long time) {
+ partitionLastFlushTime = Math.max(partitionLastFlushTime, time);
+ }
+
+ @Override
+ public ILastFlushTime degradeLastFlushTime() {
+ return this;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index 8619ed77857..b40093beb56 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
-import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
import java.io.IOException;
@@ -136,41 +135,6 @@ public class TsFileManager {
}
}
- public long recoverFlushTimeFromTsFileResource(long partitionId, String
devicePath) {
- long lastFlushTime = Long.MIN_VALUE;
- writeLock("recoverFlushTimeFromTsFileResource");
- try {
- List<TsFileResource> seqTsFileResourceList =
- sequenceFiles.computeIfAbsent(partitionId, l -> new
TsFileResourceList());
- for (int i = seqTsFileResourceList.size() - 1; i >= 0; i--) {
- TsFileResource seqResource = seqTsFileResourceList.get(i);
- if (!seqResource.isClosed()) {
- continue;
- }
- if (seqResource.getTimeIndexType() ==
ITimeIndex.DEVICE_TIME_INDEX_TYPE) {
- Set<String> deviceSet = seqResource.getDevices();
- if (deviceSet.contains(devicePath)) {
- lastFlushTime = Math.max(lastFlushTime,
seqResource.getEndTime(devicePath));
- break;
- }
- } else {
- lastFlushTime = Math.max(lastFlushTime,
seqResource.getEndTime(devicePath));
- }
- }
- List<TsFileResource> unseqTsFileResourceList =
- unsequenceFiles.computeIfAbsent(partitionId, l -> new
TsFileResourceList());
- for (TsFileResource resource : unseqTsFileResourceList) {
- if (resource.definitelyNotContains(devicePath)) {
- continue;
- }
- lastFlushTime = Math.max(lastFlushTime,
resource.getEndTime(devicePath));
- }
- } finally {
- writeUnlock();
- }
- return lastFlushTime;
- }
-
public Iterator<TsFileResource> getIterator(boolean sequence) {
readLock();
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionInfo.java
index daa2ff0c038..aa602fad9d5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionInfo.java
@@ -55,6 +55,10 @@ public class TimePartitionInfo {
if (cmp != 0) {
return cmp;
}
+ cmp = Long.compare(partitionId, timePartitionInfo.partitionId);
+ if (cmp != 0) {
+ return cmp;
+ }
cmp = Boolean.compare(isLatestPartition,
timePartitionInfo.isLatestPartition);
if (cmp != 0) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
index 3f3b040b3d1..55aeec94113 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java
@@ -92,7 +92,7 @@ public class TimePartitionManager {
timePartitionInfo.memSize = memSize;
timePartitionInfo.isActive = isActive;
if (memCost > timePartitionInfoMemoryThreshold) {
- evictOldPartition();
+ degradeLastFlushTime();
}
}
}
@@ -110,7 +110,7 @@ public class TimePartitionManager {
}
}
- private void evictOldPartition() {
+ private void degradeLastFlushTime() {
TreeSet<TimePartitionInfo> treeSet = new
TreeSet<>(TimePartitionInfo::comparePriority);
synchronized (timePartitionInfoMap) {
for (Map.Entry<DataRegionId, Map<Long, TimePartitionInfo>> entry :
@@ -123,13 +123,13 @@ public class TimePartitionManager {
if (timePartitionInfo == null) {
return;
}
- memCost -= timePartitionInfo.memSize;
+ memCost -= timePartitionInfo.memSize + Long.BYTES;
DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion(timePartitionInfo.dataRegionId);
if (dataRegion != null) {
- dataRegion.releaseFlushTimeMap(timePartitionInfo.partitionId);
+ dataRegion.degradeFlushTimeMap(timePartitionInfo.partitionId);
logger.info(
- "[{}]evict LastFlushTimeMap of old TimePartitionInfo-{}, mem
size is {}, remaining mem cost is {}",
+ "[{}]degrade LastFlushTimeMap of old TimePartitionInfo-{}, mem
size is {}, remaining mem cost is {}",
timePartitionInfo.dataRegionId,
timePartitionInfo.partitionId,
timePartitionInfo.memSize,
@@ -142,20 +142,7 @@ public class TimePartitionManager {
}
}
- public void removePartition(DataRegionId dataRegionId, long partitionId) {
- synchronized (timePartitionInfoMap) {
- Map<Long, TimePartitionInfo> timePartitionInfoMapForDataRegion =
- timePartitionInfoMap.get(dataRegionId);
- if (timePartitionInfoMapForDataRegion != null) {
- TimePartitionInfo timePartitionInfo =
timePartitionInfoMapForDataRegion.get(partitionId);
- if (timePartitionInfo != null) {
- timePartitionInfoMapForDataRegion.remove(partitionId);
- memCost -= timePartitionInfo.memSize;
- }
- }
- }
- }
-
+ @TestOnly
public TimePartitionInfo getTimePartitionInfo(DataRegionId dataRegionId,
long timePartitionId) {
synchronized (timePartitionInfoMap) {
Map<Long, TimePartitionInfo> timePartitionInfoMapForDataRegion =
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
index 55ce6ef22f6..b7d4766a4a8 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/LastFlushTimeMapTest.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.storageengine.StorageEngine;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -41,7 +40,6 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.util.List;
public class LastFlushTimeMapTest {
@@ -71,7 +69,7 @@ public class LastFlushTimeMapTest {
}
@Test
- public void testRecoverLastFlushTimeMapFromDeviceTimeIndex()
+ public void testDeviceLastFlushTimeMap()
throws IOException, IllegalPathException, WriteProcessException {
TSRecord record = new TSRecord(10000, "root.vehicle.d0");
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(1000)));
@@ -94,15 +92,10 @@ public class LastFlushTimeMapTest {
}
Assert.assertEquals(
10000, dataRegion.getLastFlushTimeMap().getFlushedTime(0,
"root.vehicle.d0"));
-
- dataRegion.getLastFlushTimeMap().clearFlushedTime();
- dataRegion.getLastFlushTimeMap().checkAndCreateFlushedTimePartition(0);
- Assert.assertEquals(
- 10000, dataRegion.getLastFlushTimeMap().getFlushedTime(0,
"root.vehicle.d0"));
}
@Test
- public void testRecoverLastFlushTimeMapFromFileTimeIndex()
+ public void testPartitionLastFlushTimeMap()
throws IOException, IllegalPathException, WriteProcessException {
TSRecord record = new TSRecord(10000, "root.vehicle.d0");
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(1000)));
@@ -126,17 +119,10 @@ public class LastFlushTimeMapTest {
dataRegion.syncCloseAllWorkingTsFileProcessors();
Assert.assertEquals(
10000, dataRegion.getLastFlushTimeMap().getFlushedTime(0,
"root.vehicle.d0"));
+ Assert.assertEquals(
+ 9999, dataRegion.getLastFlushTimeMap().getFlushedTime(0,
"root.vehicle.d1"));
- dataRegion.getLastFlushTimeMap().clearFlushedTime();
- dataRegion.getLastFlushTimeMap().checkAndCreateFlushedTimePartition(0);
- List<TsFileResource> seqs = dataRegion.getSequenceFileList();
- for (TsFileResource res : seqs) {
- res.degradeTimeIndex();
- }
- List<TsFileResource> unseqs = dataRegion.getUnSequenceFileList();
- for (TsFileResource res : unseqs) {
- res.degradeTimeIndex();
- }
+ dataRegion.getLastFlushTimeMap().degradeLastFlushTime(0);
Assert.assertEquals(
10000, dataRegion.getLastFlushTimeMap().getFlushedTime(0,
"root.vehicle.d0"));
Assert.assertEquals(
@@ -144,57 +130,34 @@ public class LastFlushTimeMapTest {
}
@Test
- public void
testRecoverDeviceLastFlushedTimeWhenLargestTimestampInUnSeqSpace() {
- String seqDirPath =
- TestConstant.BASE_OUTPUT_PATH
- + "data"
- + File.separator
- + "sequence"
- + File.separator
- + "root.testsg"
- + File.separator
- + "0"
- + File.separator
- + "0";
+ public void testLastFlushedTimeWhenLargestTimestampInUnSeqSpace()
+ throws IllegalPathException, WriteProcessException {
String unseqDirPath =
TestConstant.BASE_OUTPUT_PATH
+ "data"
+ File.separator
+ "unsequence"
+ File.separator
- + "root.testsg"
+ + "root.vehicle"
+ File.separator
+ "0"
+ File.separator
+ "0";
- String device = "root.testsg.d1";
- File seqResourceFile1 = new File(seqDirPath + File.separator +
"1-1-0-0.tsfile.resource");
- TsFileResource seqResource1 = new TsFileResource();
- seqResource1.setFile(seqResourceFile1);
- seqResource1.setTimeIndex(new DeviceTimeIndex());
- seqResource1.updateStartTime(device, 10);
- seqResource1.updateEndTime(device, 20);
-
- File seqResourceFile2 = new File(seqDirPath + File.separator +
"2-2-0-0.tsfile.resource");
- TsFileResource seqResource2 = new TsFileResource();
- seqResource2.setTimeIndex(new DeviceTimeIndex());
- seqResource2.setFile(seqResourceFile2);
- seqResource2.updateStartTime(device, 30);
- seqResource2.updateEndTime(device, 40);
-
- File seqResourceFile3 = new File(seqDirPath + File.separator +
"3-3-0-0.tsfile.resource");
- TsFileResource seqResource3 = new TsFileResource();
- seqResource3.setTimeIndex(new DeviceTimeIndex());
- seqResource3.setFile(seqResourceFile3);
- seqResource3.updateStartTime(device, 50);
- seqResource3.updateEndTime(device, 60);
+ for (int j = 1; j <= 10; j++) {
+ TSRecord record = new TSRecord(j, "root.vehicle.d0");
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(j)));
+ dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+ }
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ String device = "root.vehicle.d0";
File unseqResourceFile1 = new File(unseqDirPath + File.separator +
"4-4-0-0.tsfile.resource");
TsFileResource unseqResource1 = new TsFileResource();
unseqResource1.setTimeIndex(new DeviceTimeIndex());
unseqResource1.setFile(unseqResourceFile1);
unseqResource1.updateStartTime(device, 1);
unseqResource1.updateEndTime(device, 100);
+ dataRegion.updateLastFlushTime(unseqResource1);
File unseqResourceFile2 = new File(unseqDirPath + File.separator +
"5-5-0-0.tsfile.resource");
TsFileResource unseqResource2 = new TsFileResource();
@@ -202,6 +165,7 @@ public class LastFlushTimeMapTest {
unseqResource2.setFile(unseqResourceFile2);
unseqResource2.updateStartTime(device, 1);
unseqResource2.updateEndTime(device, 10);
+ dataRegion.updateLastFlushTime(unseqResource2);
File unseqResourceFile3 = new File(unseqDirPath + File.separator +
"6-6-0-0.tsfile.resource");
TsFileResource unseqResource3 = new TsFileResource();
@@ -209,18 +173,10 @@ public class LastFlushTimeMapTest {
unseqResource3.setFile(unseqResourceFile3);
unseqResource3.updateStartTime(device, 1);
unseqResource3.updateEndTime(device, 70);
+ dataRegion.updateLastFlushTime(unseqResource3);
- TsFileManager tsFileManager = dataRegion.getTsFileManager();
- tsFileManager.add(seqResource1, true);
- tsFileManager.add(seqResource2, true);
- tsFileManager.add(seqResource3, true);
- tsFileManager.add(unseqResource1, false);
- tsFileManager.add(unseqResource2, false);
- tsFileManager.add(unseqResource3, false);
-
- dataRegion.getLastFlushTimeMap().checkAndCreateFlushedTimePartition(0);
-
Assert.assertEquals(100,
dataRegion.getLastFlushTimeMap().getFlushedTime(0, device));
- tsFileManager.clear();
+ dataRegion.getLastFlushTimeMap().degradeLastFlushTime(0);
+ Assert.assertEquals(100,
dataRegion.getLastFlushTimeMap().getFlushedTime(0, device));
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManagerTest.java
index e88f2d6dd09..90dc24e89a5 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManagerTest.java
@@ -126,4 +126,22 @@ public class TimePartitionManagerTest {
Assert.assertNull(timePartitionManager.getTimePartitionInfo(new
DataRegionId(3), 0L));
}
+
+ @Test
+ public void testCompareTimePartitionInfo() {
+ TimePartitionInfo timePartitionInfo =
+ new TimePartitionInfo(new DataRegionId(1), 0L, true, 100, 0, true);
+ TimePartitionInfo timePartitionInfo1 =
+ new TimePartitionInfo(new DataRegionId(1), 0L, false, 100, 0, true);
+ Assert.assertEquals(1,
timePartitionInfo.comparePriority(timePartitionInfo1));
+ TimePartitionInfo timePartitionInfo2 =
+ new TimePartitionInfo(new DataRegionId(1), 1L, true, 100, 0, true);
+ Assert.assertEquals(-1,
timePartitionInfo.comparePriority(timePartitionInfo2));
+ TimePartitionInfo timePartitionInfo3 =
+ new TimePartitionInfo(new DataRegionId(1), 0L, true, 100, 0, false);
+ Assert.assertEquals(1,
timePartitionInfo.comparePriority(timePartitionInfo3));
+ TimePartitionInfo timePartitionInfo4 =
+ new TimePartitionInfo(new DataRegionId(1), 0L, true, 101, 0, true);
+ Assert.assertEquals(-1,
timePartitionInfo.comparePriority(timePartitionInfo4));
+ }
}