This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 897ed48ec9 [IOTDB-3247] Recover aligned sensors after deleting
timeseries, query lost data (#6468)
897ed48ec9 is described below
commit 897ed48ec90652bf5ed3b13b424993e75381a280
Author: Chen YZ <[email protected]>
AuthorDate: Thu Jun 30 22:09:17 2022 +0800
[IOTDB-3247] Recover aligned sensors after deleting timeseries, query lost
data (#6468)
---
.../iotdb/db/engine/memtable/AbstractMemTable.java | 35 ++-
.../engine/memtable/AlignedWritableMemChunk.java | 26 +-
.../db/wal/recover/file/TsFilePlanRedoerTest.java | 303 +++++++++++++++++++++
3 files changed, 340 insertions(+), 24 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 06726f3244..2b6d39d32b 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -56,7 +56,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
public abstract class AbstractMemTable implements IMemTable {
/** each memTable node has a unique int value identifier, init when
recovering wal */
@@ -128,7 +130,7 @@ public abstract class AbstractMemTable implements IMemTable
{
IWritableMemChunkGroup memChunkGroup =
memTableMap.computeIfAbsent(deviceId, k -> new
WritableMemChunkGroup());
for (IMeasurementSchema schema : schemaList) {
- if (!memChunkGroup.contains(schema.getMeasurementId())) {
+ if (schema != null &&
!memChunkGroup.contains(schema.getMeasurementId())) {
seriesNumber++;
totalPointsNumThreshold += avgSeriesPointNumThreshold;
}
@@ -144,10 +146,11 @@ public abstract class AbstractMemTable implements
IMemTable {
k -> {
seriesNumber += schemaList.size();
totalPointsNumThreshold += ((long) avgSeriesPointNumThreshold) *
schemaList.size();
- return new AlignedWritableMemChunkGroup(schemaList);
+ return new AlignedWritableMemChunkGroup(
+
schemaList.stream().filter(Objects::nonNull).collect(Collectors.toList()));
});
for (IMeasurementSchema schema : schemaList) {
- if (!memChunkGroup.contains(schema.getMeasurementId())) {
+ if (schema != null &&
!memChunkGroup.contains(schema.getMeasurementId())) {
seriesNumber++;
totalPointsNumThreshold += avgSeriesPointNumThreshold;
}
@@ -275,6 +278,7 @@ public abstract class AbstractMemTable implements IMemTable
{
for (int i = 0; i < insertRowPlan.getMeasurements().length; i++) {
// use measurements[i] to ignore failed partial insert
if (measurements[i] == null) {
+ schemaList.add(null);
continue;
}
IMeasurementSchema schema =
insertRowPlan.getMeasurementMNodes()[i].getSchema();
@@ -318,6 +322,7 @@ public abstract class AbstractMemTable implements IMemTable
{
for (int i = 0; i < insertRowNode.getMeasurements().length; i++) {
// use measurements[i] to ignore failed partial insert
if (measurements[i] == null) {
+ schemaList.add(null);
continue;
}
IMeasurementSchema schema = insertRowNode.getMeasurementSchemas()[i];
@@ -481,10 +486,10 @@ public abstract class AbstractMemTable implements
IMemTable {
List<IMeasurementSchema> schemaList = new ArrayList<>();
for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
if (insertTabletPlan.getColumns()[i] == null) {
- continue;
+ schemaList.add(null);
+ } else {
+ schemaList.add(insertTabletPlan.getMeasurementMNodes()[i].getSchema());
}
- IMeasurementSchema schema =
insertTabletPlan.getMeasurementMNodes()[i].getSchema();
- schemaList.add(schema);
}
IWritableMemChunkGroup memChunkGroup =
createMemChunkGroupIfNotExistAndGet(insertTabletPlan.getDeviceID(),
schemaList);
@@ -507,10 +512,10 @@ public abstract class AbstractMemTable implements
IMemTable {
List<IMeasurementSchema> schemaList = new ArrayList<>();
for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) {
if (insertTabletNode.getColumns()[i] == null) {
- continue;
+ schemaList.add(null);
+ } else {
+ schemaList.add(insertTabletNode.getMeasurementSchemas()[i]);
}
- IMeasurementSchema schema = insertTabletNode.getMeasurementSchemas()[i];
- schemaList.add(schema);
}
IWritableMemChunkGroup memChunkGroup =
createMemChunkGroupIfNotExistAndGet(insertTabletNode.getDeviceID(),
schemaList);
@@ -533,10 +538,10 @@ public abstract class AbstractMemTable implements
IMemTable {
List<IMeasurementSchema> schemaList = new ArrayList<>();
for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
if (insertTabletPlan.getColumns()[i] == null) {
- continue;
+ schemaList.add(null);
+ } else {
+ schemaList.add(insertTabletPlan.getMeasurementMNodes()[i].getSchema());
}
- IMeasurementSchema schema =
insertTabletPlan.getMeasurementMNodes()[i].getSchema();
- schemaList.add(schema);
}
if (schemaList.isEmpty()) {
return;
@@ -562,10 +567,10 @@ public abstract class AbstractMemTable implements
IMemTable {
List<IMeasurementSchema> schemaList = new ArrayList<>();
for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) {
if (insertTabletNode.getColumns()[i] == null) {
- continue;
+ schemaList.add(null);
+ } else {
+ schemaList.add(insertTabletNode.getMeasurementSchemas()[i]);
}
- IMeasurementSchema schema = insertTabletNode.getMeasurementSchemas()[i];
- schemaList.add(schema);
}
if (schemaList.isEmpty()) {
return;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index d40915792b..dbcbc5c207 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -182,22 +182,30 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
putAlignedValues(times, valueList, bitMaps, columnIndexArray, start, end);
}
+ /**
+ * Check schema of columns and return array that mapping existed schema to
index of data column
+ *
+ * @param schemaListInInsertPlan Contains all existed schema in InsertPlan.
If some timeseries
+ * have been deleted, there will be null in its slot.
+ * @return columnIndexArray: schemaList[i] is schema of
columns[columnIndexArray[i]]
+ */
private int[] checkColumnsInInsertPlan(List<IMeasurementSchema>
schemaListInInsertPlan) {
Map<String, Integer> measurementIdsInInsertPlan = new HashMap<>();
for (int i = 0; i < schemaListInInsertPlan.size(); i++) {
-
measurementIdsInInsertPlan.put(schemaListInInsertPlan.get(i).getMeasurementId(),
i);
- if
(!containsMeasurement(schemaListInInsertPlan.get(i).getMeasurementId())) {
- this.measurementIndexMap.put(
- schemaListInInsertPlan.get(i).getMeasurementId(),
measurementIndexMap.size());
- this.schemaList.add(schemaListInInsertPlan.get(i));
- this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
+ if (schemaListInInsertPlan.get(i) != null) {
+
measurementIdsInInsertPlan.put(schemaListInInsertPlan.get(i).getMeasurementId(),
i);
+ if
(!containsMeasurement(schemaListInInsertPlan.get(i).getMeasurementId())) {
+ this.measurementIndexMap.put(
+ schemaListInInsertPlan.get(i).getMeasurementId(),
measurementIndexMap.size());
+ this.schemaList.add(schemaListInInsertPlan.get(i));
+ this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
+ }
}
}
int[] columnIndexArray = new int[measurementIndexMap.size()];
measurementIndexMap.forEach(
- (measurementId, i) -> {
- columnIndexArray[i] =
measurementIdsInInsertPlan.getOrDefault(measurementId, -1);
- });
+ (measurementId, i) ->
+ columnIndexArray[i] =
measurementIdsInInsertPlan.getOrDefault(measurementId, -1));
return columnIndexArray;
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java
b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java
index 1be4ef012a..feab7d9d82 100644
---
a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java
@@ -19,10 +19,12 @@
package org.apache.iotdb.db.wal.recover.file;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -32,18 +34,22 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.wal.utils.TsFileUtilsForRecoverTest;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.BooleanDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.StringDataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
@@ -53,6 +59,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -64,13 +71,25 @@ public class TsFilePlanRedoerTest {
private static final String SG_NAME = "root.recover_sg";
private static final String DEVICE1_NAME = SG_NAME.concat(".d1");
private static final String DEVICE2_NAME = SG_NAME.concat(".d2");
+ private static final String DEVICE3_NAME = SG_NAME.concat(".d3");
private static final String FILE_NAME =
TsFileUtilsForRecoverTest.getTestTsFilePath(SG_NAME, 0, 0, 1);
private TsFileResource tsFileResource;
+ private CompressionType compressionType;
+ boolean prevIsAutoCreateSchemaEnabled;
+ boolean prevIsEnablePartialInsert;
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
+
+ // set recover config, avoid creating deleted time series when recovering
wal
+ prevIsAutoCreateSchemaEnabled =
+ IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+
IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
+ prevIsEnablePartialInsert =
IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert();
+ IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(true);
+ compressionType =
TSFileDescriptor.getInstance().getConfig().getCompressor();
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(SG_NAME));
IoTDB.schemaProcessor.createTimeseries(
new PartialPath(DEVICE1_NAME.concat(".s1")),
@@ -96,6 +115,19 @@ public class TsFilePlanRedoerTest {
TSEncoding.RLE,
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
+ IoTDB.schemaProcessor.createAlignedTimeSeries(
+ new PartialPath(DEVICE3_NAME),
+ Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+ Arrays.asList(
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.BOOLEAN,
+ TSDataType.FLOAT,
+ TSDataType.TEXT),
+ Arrays.asList(
+ TSEncoding.RLE, TSEncoding.RLE, TSEncoding.RLE, TSEncoding.RLE,
TSEncoding.PLAIN),
+ Arrays.asList(
+ compressionType, compressionType, compressionType,
compressionType, compressionType));
}
@After
@@ -104,6 +136,11 @@ public class TsFilePlanRedoerTest {
tsFileResource.close();
}
EnvironmentUtils.cleanEnv();
+ // reset config
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setAutoCreateSchemaEnabled(prevIsAutoCreateSchemaEnabled);
+
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(prevIsEnablePartialInsert);
}
@Test
@@ -161,6 +198,64 @@ public class TsFilePlanRedoerTest {
assertEquals(6, time);
}
+ @Test
+ public void testRedoInsertAlignedRowPlan() throws Exception {
+ // generate .tsfile and update resource in memory
+ File file = new File(FILE_NAME);
+ generateCompleteFile(file);
+ tsFileResource = new TsFileResource(file);
+ tsFileResource.updateStartTime(DEVICE3_NAME, 5);
+ tsFileResource.updateStartTime(DEVICE3_NAME, 5);
+
+ // generate InsertRowPlan
+ long time = 6;
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.INT32, TSDataType.INT64, TSDataType.BOOLEAN,
TSDataType.FLOAT, TSDataType.TEXT
+ };
+ String[] columns = new String[] {1 + "", 1 + "", true + "", 1.0 + "", "1"};
+ InsertRowPlan insertRowPlan =
+ new InsertRowPlan(
+ new PartialPath(DEVICE3_NAME),
+ time,
+ new String[] {"s1", "s2", "s3", "s4", "s5"},
+ dataTypes,
+ columns,
+ true);
+
+ // redo InsertTabletPlan, vsg processor is used to test IdTable, don't
test IdTable here
+ TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true,
null);
+ planRedoer.redoInsert(insertRowPlan);
+
+ // check data in memTable
+ IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
+ // check d3
+ AlignedPath fullPath =
+ new AlignedPath(
+ DEVICE3_NAME,
+ Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
+ new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
+ new MeasurementSchema("s3", TSDataType.BOOLEAN,
TSEncoding.RLE),
+ new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
+ new MeasurementSchema("s5", TSDataType.TEXT,
TSEncoding.PLAIN)));
+ ReadOnlyMemChunk memChunk = recoveryMemTable.query(fullPath,
Long.MIN_VALUE, null);
+ IPointReader iterator = memChunk.getPointReader();
+ time = 6;
+ while (iterator.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = iterator.nextTimeValuePair();
+ assertEquals(time, timeValuePair.getTimestamp());
+ assertEquals(1, timeValuePair.getValue().getVector()[0].getInt());
+ assertEquals(1L, timeValuePair.getValue().getVector()[1].getLong());
+ assertEquals(true, timeValuePair.getValue().getVector()[2].getBoolean());
+ assertEquals(1, timeValuePair.getValue().getVector()[3].getFloat(),
0.00001);
+ assertEquals(Binary.valueOf("1"),
timeValuePair.getValue().getVector()[4].getBinary());
+ ++time;
+ }
+ assertEquals(7, time);
+ }
+
@Test
public void testRedoInsertTabletPlan() throws Exception {
// generate .tsfile and update resource in memory
@@ -239,6 +334,94 @@ public class TsFilePlanRedoerTest {
assertEquals(8, time);
}
+ @Test
+ public void testRedoInsertAlignedTabletPlan() throws Exception {
+ // generate .tsfile and update resource in memory
+ File file = new File(FILE_NAME);
+ generateCompleteFile(file);
+ tsFileResource = new TsFileResource(file);
+ tsFileResource.updateStartTime(DEVICE3_NAME, 5);
+ tsFileResource.updateStartTime(DEVICE3_NAME, 5);
+
+ // generate InsertTabletPlan
+ long[] times = {6, 7, 8, 9};
+ List<Integer> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.INT32.ordinal());
+ dataTypes.add(TSDataType.INT64.ordinal());
+ dataTypes.add(TSDataType.BOOLEAN.ordinal());
+ dataTypes.add(TSDataType.FLOAT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+
+ Object[] columns = new Object[5];
+ columns[0] = new int[times.length];
+ columns[1] = new long[times.length];
+ columns[2] = new boolean[times.length];
+ columns[3] = new float[times.length];
+ columns[4] = new Binary[times.length];
+
+ for (int r = 0; r < times.length; r++) {
+ ((int[]) columns[0])[r] = (r + 1) * 100;
+ ((long[]) columns[1])[r] = (r + 1) * 100;
+ ((boolean[]) columns[2])[r] = true;
+ ((float[]) columns[3])[r] = (r + 1) * 100;
+ ((Binary[]) columns[4])[r] = Binary.valueOf((r + 1) * 100 + "");
+ }
+
+ BitMap[] bitMaps = new BitMap[dataTypes.size()];
+ for (int i = 0; i < dataTypes.size(); i++) {
+ if (bitMaps[i] == null) {
+ bitMaps[i] = new BitMap(times.length);
+ }
+ // mark value of time=9 as null
+ bitMaps[i].mark(3);
+ }
+
+ InsertTabletPlan insertTabletPlan =
+ new InsertTabletPlan(
+ new PartialPath(DEVICE3_NAME),
+ new String[] {"s1", "s2", "s3", "s4", "s5"},
+ dataTypes,
+ true);
+ insertTabletPlan.setTimes(times);
+ insertTabletPlan.setColumns(columns);
+ insertTabletPlan.setRowCount(times.length);
+ insertTabletPlan.setBitMaps(bitMaps);
+
+ // redo InsertTabletPlan, vsg processor is used to test IdTable, don't
test IdTable here
+ TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true,
null);
+ planRedoer.redoInsert(insertTabletPlan);
+
+ // check data in memTable
+ IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
+ // check d3
+ AlignedPath fullPath =
+ new AlignedPath(
+ DEVICE3_NAME,
+ Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
+ new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
+ new MeasurementSchema("s3", TSDataType.BOOLEAN,
TSEncoding.RLE),
+ new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
+ new MeasurementSchema("s5", TSDataType.TEXT,
TSEncoding.PLAIN)));
+ ReadOnlyMemChunk memChunk = recoveryMemTable.query(fullPath,
Long.MIN_VALUE, null);
+ IPointReader iterator = memChunk.getPointReader();
+ int time = 6;
+ while (iterator.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = iterator.nextTimeValuePair();
+ assertEquals(time, timeValuePair.getTimestamp());
+ assertEquals((time - 5) * 100,
timeValuePair.getValue().getVector()[0].getInt());
+ assertEquals((time - 5) * 100L,
timeValuePair.getValue().getVector()[1].getLong());
+ assertEquals(true, timeValuePair.getValue().getVector()[2].getBoolean());
+ assertEquals((time - 5) * 100,
timeValuePair.getValue().getVector()[3].getFloat(), 0.00001);
+ assertEquals(
+ Binary.valueOf((time - 5) * 100 + ""),
+ timeValuePair.getValue().getVector()[4].getBinary());
+ ++time;
+ }
+ assertEquals(9, time);
+ }
+
@Test
public void testRedoOverLapPlanIntoSeqFile() throws Exception {
// generate .tsfile and update resource in memory
@@ -371,6 +554,111 @@ public class TsFilePlanRedoerTest {
assertTrue(modsFile.exists());
}
+ @Test
+ public void testRedoAlignedInsertAfterDeleteTimeseries() throws Exception {
+ // some timeseries have been deleted
+ IoTDB.schemaProcessor.deleteTimeseries(new
PartialPath(DEVICE3_NAME.concat(".s1")));
+ IoTDB.schemaProcessor.deleteTimeseries(new
PartialPath(DEVICE3_NAME.concat(".s5")));
+ // generate .tsfile and update resource in memory
+ File file = new File(FILE_NAME);
+ generateCompleteFile(file);
+ tsFileResource = new TsFileResource(file);
+ tsFileResource.updateStartTime(DEVICE3_NAME, 5);
+ tsFileResource.updateStartTime(DEVICE3_NAME, 5);
+
+ // generate InsertTabletPlan
+ long[] times = {6, 7, 8, 9};
+ List<Integer> dataTypes =
+ Arrays.asList(
+ TSDataType.INT32.ordinal(),
+ TSDataType.INT64.ordinal(),
+ TSDataType.BOOLEAN.ordinal(),
+ TSDataType.FLOAT.ordinal(),
+ TSDataType.TEXT.ordinal());
+ Object[] columns =
+ new Object[] {
+ new int[times.length],
+ new long[times.length],
+ new boolean[times.length],
+ new float[times.length],
+ new Binary[times.length]
+ };
+ for (int r = 0; r < times.length; r++) {
+ ((int[]) columns[0])[r] = (r + 1) * 100;
+ ((long[]) columns[1])[r] = (r + 1) * 100;
+ ((boolean[]) columns[2])[r] = true;
+ ((float[]) columns[3])[r] = (r + 1) * 100;
+ ((Binary[]) columns[4])[r] = Binary.valueOf((r + 1) * 100 + "");
+ }
+ BitMap[] bitMaps = new BitMap[dataTypes.size()];
+ for (int i = 0; i < dataTypes.size(); i++) {
+ if (bitMaps[i] == null) {
+ bitMaps[i] = new BitMap(times.length);
+ }
+ // mark value of time=9 as null
+ bitMaps[i].mark(3);
+ }
+ InsertTabletPlan insertTabletPlan =
+ new InsertTabletPlan(
+ new PartialPath(DEVICE3_NAME),
+ new String[] {"s1", "s2", "s3", "s4", "s5"},
+ dataTypes,
+ true);
+ insertTabletPlan.setTimes(times);
+ insertTabletPlan.setColumns(columns);
+ insertTabletPlan.setRowCount(times.length);
+ insertTabletPlan.setBitMaps(bitMaps);
+ // redo InsertTabletPlan, vsg processor is used to test IdTable, don't
test IdTable here
+ TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true,
null);
+ planRedoer.redoInsert(insertTabletPlan);
+
+ // generate InsertRowPlan
+ int time = 9;
+ TSDataType[] dataTypes2 =
+ new TSDataType[] {
+ TSDataType.INT32, TSDataType.INT64, TSDataType.BOOLEAN,
TSDataType.FLOAT, TSDataType.TEXT
+ };
+ String[] columns2 = new String[] {400 + "", 400 + "", true + "", 400.0 +
"", "400"};
+ InsertRowPlan insertRowPlan =
+ new InsertRowPlan(
+ new PartialPath(DEVICE3_NAME),
+ time,
+ new String[] {"s1", "s2", "s3", "s4", "s5"},
+ dataTypes2,
+ columns2,
+ true);
+ // redo InsertTabletPlan, vsg processor is used to test IdTable, don't
test IdTable here
+ planRedoer.redoInsert(insertRowPlan);
+
+ // check data in memTable
+ IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
+ // check d3
+ AlignedPath fullPath =
+ new AlignedPath(
+ DEVICE3_NAME,
+ Arrays.asList("s1", "s2", "s3", "s4", "s5"),
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
+ new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
+ new MeasurementSchema("s3", TSDataType.BOOLEAN,
TSEncoding.RLE),
+ new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
+ new MeasurementSchema("s5", TSDataType.TEXT,
TSEncoding.PLAIN)));
+ ReadOnlyMemChunk memChunk = recoveryMemTable.query(fullPath,
Long.MIN_VALUE, null);
+ IPointReader iterator = memChunk.getPointReader();
+ time = 6;
+ while (iterator.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = iterator.nextTimeValuePair();
+ assertEquals(time, timeValuePair.getTimestamp());
+ assertEquals(null, timeValuePair.getValue().getVector()[0]);
+ assertEquals((time - 5) * 100L,
timeValuePair.getValue().getVector()[1].getLong());
+ assertEquals(true, timeValuePair.getValue().getVector()[2].getBoolean());
+ assertEquals((time - 5) * 100,
timeValuePair.getValue().getVector()[3].getFloat(), 0.00001);
+ assertEquals(null, timeValuePair.getValue().getVector()[4]);
+ time++;
+ }
+ assertEquals(10, time);
+ }
+
private void generateCompleteFile(File tsFile) throws IOException,
WriteProcessException {
try (TsFileWriter writer = new TsFileWriter(tsFile)) {
writer.registerTimeseries(
@@ -381,6 +669,14 @@ public class TsFilePlanRedoerTest {
new Path(DEVICE2_NAME), new MeasurementSchema("s1",
TSDataType.FLOAT, TSEncoding.RLE));
writer.registerTimeseries(
new Path(DEVICE2_NAME), new MeasurementSchema("s2",
TSDataType.DOUBLE, TSEncoding.RLE));
+ writer.registerAlignedTimeseries(
+ new Path(DEVICE3_NAME),
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
+ new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
+ new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.RLE),
+ new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
+ new MeasurementSchema("s5", TSDataType.TEXT, TSEncoding.PLAIN)));
writer.write(
new TSRecord(1, DEVICE1_NAME)
.addTuple(new IntDataPoint("s1", 1))
@@ -397,6 +693,13 @@ public class TsFilePlanRedoerTest {
new TSRecord(4, DEVICE2_NAME)
.addTuple(new FloatDataPoint("s1", 4))
.addTuple(new DoubleDataPoint("s2", 4)));
+ writer.writeAligned(
+ new TSRecord(5, DEVICE3_NAME)
+ .addTuple(new IntDataPoint("s1", 5))
+ .addTuple(new LongDataPoint("s2", 5))
+ .addTuple(new BooleanDataPoint("s3", true))
+ .addTuple(new FloatDataPoint("s4", 5))
+ .addTuple(new StringDataPoint("s5", Binary.valueOf("5"))));
}
}
}