This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 1652f10b784 [To rel/1.1] Fix recover last flush time map bug (#10764)
1652f10b784 is described below
commit 1652f10b784e6ba7e71f56e2a34bf68ac08152f3
Author: Haonan <[email protected]>
AuthorDate: Fri Aug 4 17:56:38 2023 +0800
[To rel/1.1] Fix recover last flush time map bug (#10764)
---
.../performer/impl/FastCompactionPerformer.java | 2 +-
.../iotdb/db/engine/storagegroup/DataRegion.java | 2 +-
.../engine/storagegroup/HashLastFlushTimeMap.java | 6 +-
.../storagegroup/IDTableLastFlushTimeMap.java | 5 +-
.../db/engine/storagegroup/TsFileResource.java | 11 +-
.../storagegroup/timeindex/DeviceTimeIndex.java | 4 +-
.../storagegroup/timeindex/FileTimeIndex.java | 4 +-
.../engine/storagegroup/timeindex/ITimeIndex.java | 7 +-
.../storagegroup/timeindex/V012FileTimeIndex.java | 4 +-
.../engine/storagegroup/LastFlushTimeMapTest.java | 315 ++++++++-------------
10 files changed, 140 insertions(+), 220 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/FastCompactionPerformer.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/FastCompactionPerformer.java
index 3319594effa..4dc6df9618e 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/FastCompactionPerformer.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/FastCompactionPerformer.java
@@ -120,7 +120,7 @@ public class FastCompactionPerformer
// actually exist but the judgment return device being existed.
sortedSourceFiles.addAll(seqFiles);
sortedSourceFiles.addAll(unseqFiles);
- sortedSourceFiles.removeIf(x -> !x.mayContainsDevice(device));
+ sortedSourceFiles.removeIf(x -> x.definitelyNotContains(device));
sortedSourceFiles.sort(Comparator.comparingLong(x ->
x.getStartTime(device)));
compactionWriter.startChunkGroup(device, isAligned);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 8725b1ea554..35aa650831a 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1998,7 +1998,7 @@ public class DataRegion implements IDataRegionForQuery {
deviceEndTime = startAndEndTime.getRight();
} else {
String deviceId = device.getFullPath();
- if (!tsFileResource.mayContainsDevice(deviceId)) {
+ if (tsFileResource.definitelyNotContains(deviceId)) {
// resource does not contain this device
continue;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
index 50d524a3e5b..1dfb5a9a9de 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class HashLastFlushTimeMap implements ILastFlushTimeMap {
@@ -228,8 +229,9 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
tsFileManager.getOrCreateSequenceListByTimePartition(partitionId);
for (int i = tsFileResourceList.size() - 1; i >= 0; i--) {
- if (tsFileResourceList.get(i).timeIndex.mayContainsDevice(devicePath)) {
- return tsFileResourceList.get(i).timeIndex.getEndTime(devicePath);
+ Set<String> deviceSet = tsFileResourceList.get(i).getDevices();
+ if (deviceSet.contains(devicePath)) {
+ return tsFileResourceList.get(i).getEndTime(devicePath);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
index 03dd87476ab..dd75dc2c636 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
@@ -167,8 +167,9 @@ public class IDTableLastFlushTimeMap implements
ILastFlushTimeMap {
tsFileManager.getOrCreateSequenceListByTimePartition(partitionId);
for (int i = tsFileResourceList.size() - 1; i >= 0; i--) {
- if (tsFileResourceList.get(i).timeIndex.mayContainsDevice(devicePath)) {
- return tsFileResourceList.get(i).timeIndex.getEndTime(devicePath);
+ Set<String> devices = tsFileResourceList.get(i).getDevices();
+ if (devices.contains(devicePath)) {
+ return tsFileResourceList.get(i).getEndTime(devicePath);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 550cbe6c838..4c5c7918115 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -443,11 +443,12 @@ public class TsFileResource {
}
/**
- * Whether this TsFileResource contains this device, if false, it must not
contain this device, if
- * true, it may or may not contain this device
+ * Whether this TsFile definitely not contains this device, if ture, it must
not contain this
+ * device, if false, it may or may not contain this device Notice: using
method be CAREFULLY and
+ * you really understand the meaning!!!!!
*/
- public boolean mayContainsDevice(String device) {
- return timeIndex.mayContainsDevice(device);
+ public boolean definitelyNotContains(String device) {
+ return timeIndex.definitelyNotContains(device);
}
/**
@@ -750,7 +751,7 @@ public class TsFileResource {
/** @return true if the device is contained in the TsFile */
public boolean isSatisfied(String deviceId, Filter timeFilter, boolean
isSeq, boolean debug) {
- if (!mayContainsDevice(deviceId)) {
+ if (definitelyNotContains(deviceId)) {
if (debug) {
DEBUG_LOGGER.info(
"Path: {} file {} is not satisfied because of no device!",
deviceId, file);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index 2d50171bcb1..cc141ce2b56 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -361,8 +361,8 @@ public class DeviceTimeIndex implements ITimeIndex {
}
@Override
- public boolean mayContainsDevice(String device) {
- return deviceToIndex.containsKey(device);
+ public boolean definitelyNotContains(String device) {
+ return !deviceToIndex.containsKey(device);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
index ed7b6bf18e1..b5b5535bd53 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
@@ -224,8 +224,8 @@ public class FileTimeIndex implements ITimeIndex {
}
@Override
- public boolean mayContainsDevice(String device) {
- return true;
+ public boolean definitelyNotContains(String device) {
+ return false;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
index 8db4b765302..e86918c78c9 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
@@ -189,10 +189,11 @@ public interface ITimeIndex {
int compareDegradePriority(ITimeIndex timeIndex);
/**
- * Whether this TsFile contains this device, if false, it must not contain
this device, if true,
- * it may or may not contain this device
+ * Whether this TsFile definitely not contains this device, if ture, it must
not contain this
+ * device, if false, it may or may not contain this device Notice: using
method be CAREFULLY and
+ * you really understand the meaning!!!!!
*/
- boolean mayContainsDevice(String device);
+ boolean definitelyNotContains(String device);
/**
* @return null if the deviceId doesn't exist, otherwise index 0 is
startTime, index 1 is endTime
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
index 2abca475472..a84247265b5 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
@@ -180,9 +180,9 @@ public class V012FileTimeIndex implements ITimeIndex {
}
@Override
- public boolean mayContainsDevice(String device) {
+ public boolean definitelyNotContains(String device) {
throw new UnsupportedOperationException(
- "V012FileTimeIndex should be rewritten while upgrading and
containsDevice() method should not be called any more.");
+ "V012FileTimeIndex should be rewritten while upgrading and
definitelyNotContains() method should not be called any more.");
}
@Override
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeMapTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeMapTest.java
index 2d945c67222..7495a3a39c5 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeMapTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/LastFlushTimeMapTest.java
@@ -19,205 +19,120 @@
package org.apache.iotdb.db.engine.storagegroup;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
public class LastFlushTimeMapTest {
- // protected PlanExecutor executor = new PlanExecutor();
- //
- // protected final Planner processor = new Planner();
- //
- // public LastFlushTimeMapTest() throws QueryProcessException {}
- //
- // @Before
- // public void before() {
- //
IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
- // EnvironmentUtils.envSetUp();
- // }
- //
- // @Test
- // public void testSequenceInsert()
- // throws MetadataException, QueryProcessException,
StorageEngineException {
- // insertData(0);
- // insertData(10);
- // PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
- // executor.processNonQuery(flushPlan);
- //
- // insertData(20);
- //
- // DataRegion storageGroupProcessor =
- // StorageEngine.getInstance().getProcessor(new
PartialPath("root.isp.d1"));
- // assertEquals(2, storageGroupProcessor.getSequenceFileList().size());
- // assertEquals(0, storageGroupProcessor.getUnSequenceFileList().size());
- // }
- //
- // @Test
- // public void testUnSequenceInsert()
- // throws MetadataException, QueryProcessException,
StorageEngineException {
- // insertData(100);
- // PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
- // executor.processNonQuery(flushPlan);
- //
- // insertData(20);
- //
- // DataRegion storageGroupProcessor =
- // StorageEngine.getInstance().getProcessor(new
PartialPath("root.isp.d1"));
- // assertEquals(1, storageGroupProcessor.getSequenceFileList().size());
- // assertEquals(1, storageGroupProcessor.getUnSequenceFileList().size());
- // }
- //
- // @Test
- // public void testSequenceAndUnSequenceInsert()
- // throws MetadataException, QueryProcessException,
StorageEngineException {
- // // sequence
- // insertData(100);
- // PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
- // executor.processNonQuery(flushPlan);
- //
- // // sequence
- // insertData(120);
- // executor.processNonQuery(flushPlan);
- //
- // // unsequence
- // insertData(20);
- // // sequence
- // insertData(130);
- // executor.processNonQuery(flushPlan);
- //
- // // sequence
- // insertData(150);
- // // unsequence
- // insertData(90);
- //
- // DataRegion storageGroupProcessor =
- // StorageEngine.getInstance().getProcessor(new
PartialPath("root.isp.d1"));
- // assertEquals(4, storageGroupProcessor.getSequenceFileList().size());
- // assertEquals(2, storageGroupProcessor.getUnSequenceFileList().size());
- // assertEquals(1,
storageGroupProcessor.getWorkSequenceTsFileProcessors().size());
- // assertEquals(1,
storageGroupProcessor.getWorkUnsequenceTsFileProcessors().size());
- // }
- //
- // @Test
- // public void testDeletePartition()
- // throws MetadataException, QueryProcessException,
StorageEngineException {
- // insertData(100);
- // PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
- // executor.processNonQuery(flushPlan);
- // insertData(20);
- // insertData(120);
- //
- // DataRegion storageGroupProcessor =
- // StorageEngine.getInstance().getProcessor(new
PartialPath("root.isp.d1"));
- //
- // assertEquals(
- // 103L,
storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0L, "root.isp.d1"));
- // assertEquals(
- // 103L,
storageGroupProcessor.getLastFlushTimeMap().getGlobalFlushedTime("root.isp.d1"));
- //
- // // delete time partition
- // Set<Long> deletedPartition = new HashSet<>();
- // deletedPartition.add(0L);
- // DeletePartitionPlan deletePartitionPlan =
- // new DeletePartitionPlan(new PartialPath("root.isp"),
deletedPartition);
- // executor.processNonQuery(deletePartitionPlan);
- //
- // assertEquals(
- // 123L,
storageGroupProcessor.getLastFlushTimeMap().getGlobalFlushedTime("root.isp.d1"));
- // }
- //
- // @Test
- // public void testMemoryCalculation()
- // throws QueryProcessException, IllegalPathException,
StorageEngineException,
- // StorageGroupNotSetException {
- // insertRecord("root.sg.d1", 100L);
- // DataRegion storageGroupProcessor =
- // StorageEngine.getInstance().getProcessor(new
PartialPath("root.sg"));
- // assertEquals(98l,
storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
- //
- // storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0L,
"root.sg.d100");
- // storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0L,
"root.sg.d101");
- // assertEquals(302l,
storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
- //
- //
storageGroupProcessor.getLastFlushTimeMap().setOneDeviceFlushedTime(0L,
"root.sg.d102", 0L);
- // HashMap<String, Long> updateMap = new HashMap<>();
- // updateMap.put("root.sg.d103", 1L);
- // updateMap.put("root.sg.d100", 1L);
- //
storageGroupProcessor.getLastFlushTimeMap().setMultiDeviceFlushedTime(0L,
updateMap);
- // storageGroupProcessor.getLastFlushTimeMap().updateFlushedTime(0L,
"root.sg.d103", 2L);
- // storageGroupProcessor.getLastFlushTimeMap().updateFlushedTime(0L,
"root.sg.d104", 2L);
- // assertEquals(608L,
storageGroupProcessor.getLastFlushTimeMap().getMemSize(0L));
- // }
- //
- // @Test
- // public void testRecoverFlushTime()
- // throws QueryProcessException, IllegalPathException,
StorageEngineException,
- // StorageGroupNotSetException {
- // insertData(100);
- // PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
- // executor.processNonQuery(flushPlan);
- // DataRegion storageGroupProcessor =
- // StorageEngine.getInstance().getProcessor(new
PartialPath("root.isp"));
- // assertEquals(
- // 103L,
storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0l, "root.isp.d1"));
- //
- // storageGroupProcessor.getLastFlushTimeMap().removePartition(0l);
- //
storageGroupProcessor.getLastFlushTimeMap().checkAndCreateFlushedTimePartition(0l);
- // assertEquals(
- // 103L,
storageGroupProcessor.getLastFlushTimeMap().getFlushedTime(0l, "root.isp.d1"));
- // }
- //
- // @After
- // public void clean() throws IOException, StorageEngineException {
- // EnvironmentUtils.cleanEnv();
- // }
- //
- // protected void insertData(long initTime) throws IllegalPathException,
QueryProcessException {
- //
- // long[] times = new long[] {initTime, initTime + 1, initTime + 2,
initTime + 3};
- // List<Integer> dataTypes = new ArrayList<>();
- // dataTypes.add(TSDataType.DOUBLE.ordinal());
- // dataTypes.add(TSDataType.FLOAT.ordinal());
- // dataTypes.add(TSDataType.INT64.ordinal());
- // dataTypes.add(TSDataType.INT32.ordinal());
- // dataTypes.add(TSDataType.BOOLEAN.ordinal());
- // dataTypes.add(TSDataType.TEXT.ordinal());
- //
- // Object[] columns = new Object[6];
- // columns[0] = new double[4];
- // columns[1] = new float[4];
- // columns[2] = new long[4];
- // columns[3] = new int[4];
- // columns[4] = new boolean[4];
- // columns[5] = new Binary[4];
- //
- // for (int r = 0; r < 4; r++) {
- // ((double[]) columns[0])[r] = 10.0 + r;
- // ((float[]) columns[1])[r] = 20 + r;
- // ((long[]) columns[2])[r] = 100000 + r;
- // ((int[]) columns[3])[r] = 1000 + r;
- // ((boolean[]) columns[4])[r] = false;
- // ((Binary[]) columns[5])[r] = new Binary("mm" + r);
- // }
- //
- // InsertTabletPlan tabletPlan =
- // new InsertTabletPlan(
- // new PartialPath("root.isp.d1"),
- // new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
- // dataTypes);
- // tabletPlan.setTimes(times);
- // tabletPlan.setColumns(columns);
- // tabletPlan.setRowCount(times.length);
- //
- // executor.insertTablet(tabletPlan);
- // }
- //
- // protected void insertRecord(String devicePath, long time)
- // throws IllegalPathException, QueryProcessException {
- // InsertRowPlan insertRowPlan =
- // new InsertRowPlan(
- // new PartialPath(devicePath),
- // time,
- // new String[] {"s1", "s2", "s3"},
- // new TSDataType[] {TSDataType.INT32, TSDataType.INT32,
TSDataType.INT32},
- // new String[] {"1", "1", "1"});
- //
- // executor.insert(insertRowPlan);
- // }
+
+ private DataRegion dataRegion;
+
+ private String storageGroup = "root.vehicle.d0";
+ private String measurementId = "s0";
+ private String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ dataRegion = new DataRegionTest.DummyDataRegion(systemDir, storageGroup);
+ StorageEngine.getInstance().setDataRegion(new DataRegionId(0), dataRegion);
+ CompactionTaskManager.getInstance().start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (dataRegion != null) {
+ dataRegion.syncDeleteDataFiles();
+ StorageEngine.getInstance().deleteDataRegion(new DataRegionId(0));
+ }
+ EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
+ CompactionTaskManager.getInstance().stop();
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testRecoverLastFlushTimeMapFromDeviceTimeIndex()
+ throws IOException, IllegalPathException, WriteProcessException {
+ TSRecord record = new TSRecord(10000, "root.vehicle.d0");
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(1000)));
+ dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ record = new TSRecord(9999, "root.vehicle.d1");
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(1000)));
+ dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ for (int j = 1; j <= 10; j++) {
+ record = new TSRecord(j, "root.vehicle.d0");
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(j)));
+ dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+ }
+
+ for (TsFileProcessor tsfileProcessor :
dataRegion.getWorkUnsequenceTsFileProcessors()) {
+ tsfileProcessor.syncFlush();
+ }
+ Assert.assertEquals(
+ 10000, dataRegion.getLastFlushTimeMap().getFlushedTime(0,
"root.vehicle.d0"));
+
+ dataRegion.getLastFlushTimeMap().clearFlushedTime();
+ dataRegion.getLastFlushTimeMap().checkAndCreateFlushedTimePartition(0);
+ Assert.assertEquals(
+ 10000, dataRegion.getLastFlushTimeMap().getFlushedTime(0,
"root.vehicle.d0"));
+ }
+
+ @Test
+ public void testRecoverLastFlushTimeMapFromFileTimeIndex()
+ throws IOException, IllegalPathException, WriteProcessException {
+ TSRecord record = new TSRecord(10000, "root.vehicle.d0");
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(1000)));
+ dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ record = new TSRecord(9999, "root.vehicle.d1");
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(1000)));
+ dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ for (int j = 1; j <= 10; j++) {
+ record = new TSRecord(j, "root.vehicle.d0");
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(j)));
+ dataRegion.insert(DataRegionTest.buildInsertRowNodeByTSRecord(record));
+ }
+
+ for (TsFileProcessor tsfileProcessor :
dataRegion.getWorkUnsequenceTsFileProcessors()) {
+ tsfileProcessor.syncFlush();
+ }
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ Assert.assertEquals(
+ 10000, dataRegion.getLastFlushTimeMap().getFlushedTime(0,
"root.vehicle.d0"));
+
+ dataRegion.getLastFlushTimeMap().clearFlushedTime();
+ dataRegion.getLastFlushTimeMap().checkAndCreateFlushedTimePartition(0);
+ List<TsFileResource> seqs = dataRegion.getSequenceFileList();
+ for (TsFileResource res : seqs) {
+ res.degradeTimeIndex();
+ }
+ List<TsFileResource> unseqs = dataRegion.getUnSequenceFileList();
+ for (TsFileResource res : unseqs) {
+ res.degradeTimeIndex();
+ }
+ Assert.assertEquals(
+ 10000, dataRegion.getLastFlushTimeMap().getFlushedTime(0,
"root.vehicle.d0"));
+ }
}