This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-4251
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-4251 by this push:
new 0a0a420a14 fix some bugs, and add some test
0a0a420a14 is described below
commit 0a0a420a1476c755c96b2cd24eceeb48e71a1bd2
Author: Liu Xuxin <[email protected]>
AuthorDate: Wed Sep 7 17:05:53 2022 +0800
fix some bugs, and add some test
---
.../apache/iotdb/tsfile/TsFileSequenceRead.java | 2 +-
.../write/writer/MemoryControlTsFileIOWriter.java | 78 ++-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 9 +-
.../tsfile/write/TsFileIntegrityCheckingTool.java | 143 ++---
.../writer/MemoryControlTsFileIOWriterTest.java | 620 +++++++++++++++++++--
5 files changed, 697 insertions(+), 155 deletions(-)
diff --git
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index aa946f67b7..d15882b3b8 100644
---
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++
b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -53,7 +53,7 @@ public class TsFileSequenceRead {
"squid:S106"
}) // Suppress high Cognitive Complexity and Standard outputs warning
public static void main(String[] args) throws IOException {
- String filename = "test.tsfile";
+ String filename = "C:\\Users\\MARKLAU\\Desktop\\iotdb\\1-1-0-0.tsfile";
if (args.length >= 1) {
filename = args[0];
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
index b8d8c7b768..fc47e74210 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
@@ -64,7 +64,8 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
protected LocalTsFileOutput tempOutput;
protected final boolean autoControl;
// it stores the start address of persisted chunk metadata for per series
- protected Queue<Long> segmentForPerSeries = new ArrayDeque<>();
+ // protected Queue<Long> segmentForPerSeries = new ArrayDeque<>();
+ protected volatile boolean hasChunkMetadataInDisk = false;
protected String currentSeries = null;
// record the total num of path in order to make bloom filter
protected int pathCount = 0;
@@ -109,10 +110,10 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
if (tempOutput == null) {
tempOutput = new LocalTsFileOutput(new
FileOutputStream(chunkMetadataTempFile));
}
+ hasChunkMetadataInDisk = true;
// the file structure in temp file will be
// ChunkType | chunkSize | chunkBuffer
for (Map.Entry<Path, List<IChunkMetadata>> entry :
chunkMetadataListMap.entrySet()) {
- segmentForPerSeries.add(tempOutput.getPosition());
Path seriesPath = entry.getKey();
if (!seriesPath.equals(lastSerializePath)) {
pathCount++;
@@ -121,6 +122,11 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
writeChunkMetadata(iChunkMetadataList, seriesPath, tempOutput);
lastSerializePath = seriesPath;
}
+ // clear the cache metadata to release the memory
+ chunkGroupMetadataList.clear();
+ if (chunkMetadataList != null) {
+ chunkMetadataList.clear();
+ }
}
private void writeChunkMetadata(
@@ -160,8 +166,8 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
private void writeAlignedChunkMetadata(
List<IChunkMetadata> iChunkMetadataList, Path seriesPath,
LocalTsFileOutput output)
throws IOException {
- ReadWriteIOUtils.write(VECTOR_TYPE, output);
for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
+ ReadWriteIOUtils.write(VECTOR_TYPE, output);
PublicBAOS buffer = new PublicBAOS();
int size = chunkMetadata.serializeWithFullInfo(buffer,
seriesPath.getDevice());
ReadWriteIOUtils.write(size, output);
@@ -172,8 +178,8 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
private void writeNormalChunkMetadata(
List<IChunkMetadata> iChunkMetadataList, Path seriesPath,
LocalTsFileOutput output)
throws IOException {
- ReadWriteIOUtils.write(NORMAL_TYPE, output);
for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
+ ReadWriteIOUtils.write(NORMAL_TYPE, output);
PublicBAOS buffer = new PublicBAOS();
int size = chunkMetadata.serializeWithFullInfo(buffer,
seriesPath.getFullPath());
ReadWriteIOUtils.write(size, output);
@@ -183,7 +189,7 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
@Override
public void endFile() throws IOException {
- if (this.segmentForPerSeries.size() > 0) {
+ if (hasChunkMetadataInDisk) {
// there is some chunk metadata already been written to the disk
// first we should flush the remaining chunk metadata in memory to disk
// then read the persisted chunk metadata from disk
@@ -192,7 +198,6 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
} else {
// sort the chunk metadata in memory, construct the index tree
// and just close the file
- tempOutput.close();
super.endFile();
return;
}
@@ -221,7 +226,7 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
chunkMetadataTempFile.length(),
new LocalTsFileInput(chunkMetadataTempFile.toPath()));
Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
- Queue<MetadataIndexNode> measurementMetadataIndexQueue = null;
+ Queue<MetadataIndexNode> measurementMetadataIndexQueue = new
ArrayDeque<>();
String currentDevice = null;
String prevDevice = null;
MetadataIndexNode currentIndexNode =
@@ -232,14 +237,17 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
BloomFilter.getEmptyBloomFilter(
TSFileDescriptor.getInstance().getConfig().getBloomFilterErrorRate(),
pathCount);
+ int indexCount = 0;
while (iterator.hasNextChunkMetadata()) {
// read in all chunk metadata of one series
// construct the timeseries metadata for this series
TimeseriesMetadata timeseriesMetadata = readTimeseriesMetadata(iterator);
+ indexCount++;
// build bloom filter
filter.add(currentSeries);
// construct the index tree node for the series
- currentDevice = new Path(currentSeries).getDevice();
+ Path currentPath = new Path(currentSeries, true);
+ currentDevice = currentPath.getDevice();
if (!currentDevice.equals(prevDevice)) {
if (prevDevice != null) {
addCurrentIndexNodeToQueue(currentIndexNode,
measurementMetadataIndexQueue, out);
@@ -259,7 +267,7 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
currentIndexNode = new
MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
}
currentIndexNode.addEntry(
- new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(),
out.getPosition()));
+ new MetadataIndexEntry(currentPath.getMeasurement(),
out.getPosition()));
}
prevDevice = currentDevice;
@@ -268,6 +276,18 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
timeseriesMetadata.serializeTo(out.wrapAsStream());
}
+ addCurrentIndexNodeToQueue(currentIndexNode,
measurementMetadataIndexQueue, out);
+ deviceMetadataIndexMap.put(
+ prevDevice,
+ generateRootNode(
+ measurementMetadataIndexQueue, out,
MetadataIndexNodeType.INTERNAL_MEASUREMENT));
+
+ if (indexCount != pathCount) {
+ throw new IOException(
+ String.format(
+ "Expected path count is %d, index path count is %d", pathCount,
indexCount));
+ }
+
MetadataIndexNode metadataIndex =
checkAndBuildLevelIndex(deviceMetadataIndexMap, out);
TsFileMetadata tsFileMetadata = new TsFileMetadata();
@@ -290,20 +310,12 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
*/
private TimeseriesMetadata readTimeseriesMetadata(ChunkMetadataReadIterator
iterator)
throws IOException {
- Pair<String, IChunkMetadata> currentPair = iterator.getCurrentPair();
- if (currentPair == null) {
- currentPair = iterator.getNextSeriesNameAndChunkMetadata();
- }
- if (!currentPair.left.equals(currentSeries)) {
- // come to a new series
- currentSeries = currentPair.left;
- }
List<IChunkMetadata> iChunkMetadataList = new ArrayList<>();
- while (currentPair != null && currentPair.left.equals(currentSeries)) {
- iChunkMetadataList.add(currentPair.right);
- currentPair = iterator.getNextSeriesNameAndChunkMetadata();
- }
- return super.constructOneTimeseriesMetadata(new Path(currentSeries),
iChunkMetadataList, false);
+ currentSeries =
iterator.getAllChunkMetadataForNextSeries(iChunkMetadataList);
+ TimeseriesMetadata timeseriesMetadata =
+ super.constructOneTimeseriesMetadata(new Path(currentSeries),
iChunkMetadataList, false);
+ timeseriesMetadata.setMeasurementId(new Path(currentSeries,
true).getMeasurement());
+ return timeseriesMetadata;
}
@Override
@@ -358,6 +370,28 @@ public class MemoryControlTsFileIOWriter extends
TsFileIOWriter {
return currentPair;
}
+ public String getAllChunkMetadataForNextSeries(List<IChunkMetadata>
iChunkMetadataList)
+ throws IOException {
+ if (currentPair == null) {
+ if (!hasNextChunkMetadata()) {
+ return null;
+ } else {
+ getNextSeriesNameAndChunkMetadata();
+ }
+ }
+ String currentSeries = currentPair.left;
+ iChunkMetadataList.add(currentPair.right);
+ while (hasNextChunkMetadata()) {
+ getNextSeriesNameAndChunkMetadata();
+ if (currentPair != null && currentPair.left.equals(currentSeries)) {
+ iChunkMetadataList.add(currentPair.right);
+ } else {
+ break;
+ }
+ }
+ return currentSeries;
+ }
+
public Pair<String, IChunkMetadata> getCurrentPair() {
return currentPair;
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index aba72ce2e4..89f5ad6d7b 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -253,11 +253,10 @@ public class TsFileIOWriter implements AutoCloseable {
}
if (chunkMetadataList != null && chunkMetadataList.size() > 0) {
- ChunkMetadata chunkMetadata = chunkMetadataList.get(0);
- Path series = new Path(currentChunkGroupDeviceId,
chunkMetadata.getMeasurementUid());
- chunkMetadataListMap
- .computeIfAbsent(series, k -> new ArrayList<>())
- .addAll(chunkMetadataList);
+ for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ Path series = new Path(currentChunkGroupDeviceId,
chunkMetadata.getMeasurementUid());
+ chunkMetadataListMap.computeIfAbsent(series, k -> new
ArrayList<>()).add(chunkMetadata);
+ }
}
return chunkMetadataListMap;
}
diff --git
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
similarity index 52%
copy from
example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
copy to
tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
index aa946f67b7..b635d63a9c 100644
---
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java
@@ -16,7 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.tsfile;
+
+package org.apache.iotdb.tsfile.write;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -27,49 +28,42 @@ import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
-/** This tool is used to read TsFile sequentially, including nonAligned or
aligned timeseries. */
-public class TsFileSequenceRead {
- // if you wanna print detailed datas in pages, then turn it true.
- private static boolean printDetail = false;
+/** This class provide some static method to check the integrity of tsfile */
+public class TsFileIntegrityCheckingTool {
+ private static Logger LOG =
LoggerFactory.getLogger(TsFileIntegrityCheckingTool.class);
- @SuppressWarnings({
- "squid:S3776",
- "squid:S106"
- }) // Suppress high Cognitive Complexity and Standard outputs warning
- public static void main(String[] args) throws IOException {
- String filename = "test.tsfile";
- if (args.length >= 1) {
- filename = args[0];
- }
+ public static void checkIntegrityBySequenceRead(String filename) {
try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
- System.out.println(
- "file length: " +
FSFactoryProducer.getFSFactory().getFile(filename).length());
- System.out.println("file magic head: " + reader.readHeadMagic());
- System.out.println("file magic tail: " + reader.readTailMagic());
- System.out.println("Level 1 metadata position: " +
reader.getFileMetadataPos());
- System.out.println("Level 1 metadata size: " +
reader.getTsFileMetadataSize());
- // Sequential reading of one ChunkGroup now follows this order:
- // first the CHUNK_GROUP_HEADER, then SeriesChunks (headers and data) in
one ChunkGroup
- // Because we do not know how many chunks a ChunkGroup may have, we
should read one byte (the
- // marker) ahead and judge accordingly.
+ String headMagicString = reader.readHeadMagic();
+ Assert.assertEquals(TSFileConfig.MAGIC_STRING, headMagicString);
+ String tailMagicString = reader.readTailMagic();
+ Assert.assertEquals(TSFileConfig.MAGIC_STRING, tailMagicString);
reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
- System.out.println("position: " + reader.position());
List<long[]> timeBatch = new ArrayList<>();
int pageIndex = 0;
byte marker;
@@ -81,18 +75,11 @@ public class TsFileSequenceRead {
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
- System.out.println("\t[Chunk]");
- System.out.println("\tchunk type: " + marker);
- System.out.println("\tposition: " + reader.position());
ChunkHeader header = reader.readChunkHeader(marker);
- System.out.println("\tMeasurement: " + header.getMeasurementID());
if (header.getDataSize() == 0) {
// empty value chunk
- System.out.println("\t-- Empty Chunk ");
break;
}
- System.out.println(
- "\tChunk Size: " + (header.getDataSize() +
header.getSerializedSize()));
Decoder defaultTimeDecoder =
Decoder.getDecoderByType(
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
@@ -106,97 +93,83 @@ public class TsFileSequenceRead {
}
while (dataSize > 0) {
valueDecoder.reset();
- System.out.println(
- "\t\t[Page" + pageIndex + "]\n \t\tPage head position: " +
reader.position());
PageHeader pageHeader =
reader.readPageHeader(
header.getDataType(),
(header.getChunkType() & 0x3F) ==
MetaMarker.CHUNK_HEADER);
- System.out.println("\t\tPage data position: " +
reader.position());
ByteBuffer pageData = reader.readPage(pageHeader,
header.getCompressionType());
- System.out.println(
- "\t\tUncompressed page data size: " +
pageHeader.getUncompressedSize());
- System.out.println(
- "\t\tCompressed page data size: " +
pageHeader.getCompressedSize());
if ((header.getChunkType() & (byte)
TsFileConstant.TIME_COLUMN_MASK)
== (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
TimePageReader timePageReader =
new TimePageReader(pageHeader, pageData,
defaultTimeDecoder);
timeBatch.add(timePageReader.getNextTimeBatch());
- System.out.println("\t\tpoints in the page: " +
timeBatch.get(pageIndex).length);
- if (printDetail) {
- for (int i = 0; i < timeBatch.get(pageIndex).length; i++) {
- System.out.println("\t\t\ttime: " +
timeBatch.get(pageIndex)[i]);
- }
- }
} else if ((header.getChunkType() & (byte)
TsFileConstant.VALUE_COLUMN_MASK)
== (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
ValuePageReader valuePageReader =
new ValuePageReader(pageHeader, pageData,
header.getDataType(), valueDecoder);
TsPrimitiveType[] valueBatch =
valuePageReader.nextValueBatch(timeBatch.get(pageIndex));
- if (valueBatch.length == 0) {
- System.out.println("\t\t-- Empty Page ");
- } else {
- System.out.println("\t\tpoints in the page: " +
valueBatch.length);
- }
- if (printDetail) {
- for (TsPrimitiveType batch : valueBatch) {
- System.out.println("\t\t\tvalue: " + batch);
- }
- }
} else { // NonAligned Chunk
PageReader pageReader =
new PageReader(
pageData, header.getDataType(), valueDecoder,
defaultTimeDecoder, null);
BatchData batchData = pageReader.getAllSatisfiedPageData();
- if (header.getChunkType() == MetaMarker.CHUNK_HEADER) {
- System.out.println("\t\tpoints in the page: " +
pageHeader.getNumOfValues());
- } else {
- System.out.println("\t\tpoints in the page: " +
batchData.length());
- }
- if (printDetail) {
- while (batchData.hasCurrent()) {
- System.out.println(
- "\t\t\ttime, value: "
- + batchData.currentTime()
- + ", "
- + batchData.currentValue());
- batchData.next();
- }
- }
}
pageIndex++;
dataSize -= pageHeader.getSerializedPageSize();
}
break;
case MetaMarker.CHUNK_GROUP_HEADER:
- System.out.println("[Chunk Group]");
- System.out.println("Chunk Group Header position: " +
reader.position());
ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
- System.out.println("device: " + chunkGroupHeader.getDeviceID());
break;
case MetaMarker.OPERATION_INDEX_RANGE:
reader.readPlanIndex();
- System.out.println("minPlanIndex: " + reader.getMinPlanIndex());
- System.out.println("maxPlanIndex: " + reader.getMaxPlanIndex());
break;
default:
MetaMarker.handleUnexpectedMarker(marker);
}
}
- System.out.println("[Metadata]");
- for (String device : reader.getAllDevices()) {
- Map<String, List<ChunkMetadata>> seriesMetaData =
reader.readChunkMetadataInDevice(device);
- System.out.printf(
- "\t[Device]Device %s, Number of Measurements %d%n", device,
seriesMetaData.size());
- for (Map.Entry<String, List<ChunkMetadata>> serie :
seriesMetaData.entrySet()) {
- System.out.println("\t\tMeasurement:" + serie.getKey());
- for (ChunkMetadata chunkMetadata : serie.getValue()) {
- System.out.println("\t\tFile offset:" +
chunkMetadata.getOffsetOfChunkHeader());
+ } catch (IOException e) {
+ LOG.error("Meet exception when checking integrity of tsfile", e);
+ Assert.fail();
+ }
+ }
+
+ public static void checkIntegrityByQuery(
+ String filename, Map<String, Map<String, List<List<Long>>>> originData) {
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
+ Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata =
+ reader.getAllTimeseriesMetadata(true);
+ Assert.assertEquals(originData.size(), allTimeseriesMetadata.size());
+ for (Map.Entry<String, List<TimeseriesMetadata>> entry :
allTimeseriesMetadata.entrySet()) {
+ String deviceId = entry.getKey();
+ List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
+ Assert.assertEquals(originData.get(deviceId).size(),
timeseriesMetadataList.size());
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+ String measurementId = timeseriesMetadata.getMeasurementId();
+ List<List<Long>> originChunks =
originData.get(deviceId).get(measurementId);
+ List<IChunkMetadata> chunkMetadataList =
timeseriesMetadata.getChunkMetadataList();
+ Assert.assertEquals(originChunks.size(), chunkMetadataList.size());
+
chunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime));
+ for (int i = 0; i < chunkMetadataList.size(); ++i) {
+ Chunk chunk = reader.readMemChunk((ChunkMetadata)
chunkMetadataList.get(i));
+ ChunkReader chunkReader = new ChunkReader(chunk, null);
+ List<Long> originValue = originChunks.get(i);
+ for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) {
+ IPointReader pointReader =
chunkReader.nextPageData().getBatchDataIterator();
+ while (pointReader.hasNextTimeValuePair()) {
+ Assert.assertEquals(
+ originValue.get(valIdx++).longValue(),
+ pointReader.nextTimeValuePair().getTimestamp());
+ }
+ }
}
}
}
+
+ } catch (IOException e) {
+ LOG.error("Meet exception when checking integrity of tsfile", e);
+ Assert.fail();
}
}
}
diff --git
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
index 392a699222..32d2248ba5 100644
---
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
+++
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriterTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.write.TsFileIntegrityCheckingTool;
import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -41,24 +42,47 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter {
private static File testFile = new File("target", "1-1-0-0.tsfile");
private static File emptyFile = new File("target", "temp");
- private static final int TEST_CHUNK_SIZE = 1000;
+ private long TEST_CHUNK_SIZE = 1000;
+ private List<String> measurementDictInOrder = new ArrayList<>();
+ private List<String> deviceDictInOrder = new ArrayList<>();
+ private boolean init = false;
@Before
- public void setUp() throws IOException {}
+ public void setUp() throws IOException {
+ if (!init) {
+ init = true;
+ for (int i = 0; i < 2048; ++i) {
+ measurementDictInOrder.add("s" + i);
+ deviceDictInOrder.add("root.sg.d" + i);
+ }
+ measurementDictInOrder.sort((String::compareTo));
+ deviceDictInOrder.sort((String::compareTo));
+ }
+ }
@After
public void tearDown() throws IOException {
this.close();
- FileUtils.delete(testFile);
- FileUtils.delete(
- new File(testFile.getPath() +
MemoryControlTsFileIOWriter.CHUNK_METADATA_TEMP_FILE_PREFIX));
- FileUtils.delete(emptyFile);
+ if (testFile.exists()) {
+ FileUtils.delete(testFile);
+ }
+ if (new File(testFile.getPath() +
MemoryControlTsFileIOWriter.CHUNK_METADATA_TEMP_FILE_PREFIX)
+ .exists()) {
+ FileUtils.delete(
+ new File(
+ testFile.getPath() +
MemoryControlTsFileIOWriter.CHUNK_METADATA_TEMP_FILE_PREFIX));
+ }
+ if (emptyFile.exists()) {
+ FileUtils.delete(emptyFile);
+ }
}
public MemoryControlTsFileIOWriterTest() throws IOException {
@@ -71,26 +95,26 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
new MemoryControlTsFileIOWriter(testFile, 1024 * 1024 * 10, true)) {
List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
- String deviceId = "root.sg.d" + i;
+ String deviceId = deviceDictInOrder.get(i);
writer.startChunkGroup(deviceId);
for (int j = 0; j < 5; ++j) {
ChunkWriterImpl chunkWriter;
switch (j) {
case 0:
- chunkWriter = generateIntData(j);
+ chunkWriter = generateIntData(j, 0L);
break;
case 1:
- chunkWriter = generateBooleanData(j);
+ chunkWriter = generateBooleanData(j, 0);
break;
case 2:
- chunkWriter = generateFloatData(j);
+ chunkWriter = generateFloatData(j, 0L);
break;
case 3:
- chunkWriter = generateDoubleData(j);
+ chunkWriter = generateDoubleData(j, 0L);
break;
case 4:
default:
- chunkWriter = generateTextData(j);
+ chunkWriter = generateTextData(j, 0L);
break;
}
chunkWriter.writeToFileWriter(writer);
@@ -109,7 +133,9 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
new LocalTsFileInput(writer.chunkMetadataTempFile.toPath()));
for (int i = 0; i < originChunkMetadataList.size(); ++i) {
Pair<String, IChunkMetadata> chunkMetadataPair =
window.getNextSeriesNameAndChunkMetadata();
- Assert.assertEquals("root.sg.d" + i / 5 + ".s" + i % 5,
chunkMetadataPair.left);
+ Assert.assertEquals(
+ deviceDictInOrder.get(i / 5) + "." + measurementDictInOrder.get(i
% 5),
+ chunkMetadataPair.left);
Assert.assertEquals(
originChunkMetadataList.get(i).getStartTime(),
chunkMetadataPair.right.getStartTime());
Assert.assertEquals(
@@ -129,9 +155,9 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
new MemoryControlTsFileIOWriter(testFile, 1024 * 1024 * 10, true)) {
List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
- String deviceId = "root.sg.d" + i;
+ String deviceId = deviceDictInOrder.get(i);
writer.startChunkGroup(deviceId);
- AlignedChunkWriterImpl chunkWriter = generateVectorData(i);
+ AlignedChunkWriterImpl chunkWriter = generateVectorData(i, 0L);
chunkWriter.writeToFileWriter(writer);
originChunkMetadataList.addAll(writer.chunkMetadataList);
writer.endChunkGroup();
@@ -165,7 +191,7 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
new LocalTsFileInput(writer.chunkMetadataTempFile.toPath()));
for (int i = 0; i < alignedChunkMetadata.size(); ++i) {
Pair<String, IChunkMetadata> chunkMetadataPair =
window.getNextSeriesNameAndChunkMetadata();
- Assert.assertEquals("root.sg.d" + i, chunkMetadataPair.left);
+ Assert.assertEquals(deviceDictInOrder.get(i), chunkMetadataPair.left);
Assert.assertEquals(
alignedChunkMetadata.get(i).getStartTime(),
chunkMetadataPair.right.getStartTime());
Assert.assertEquals(
@@ -184,7 +210,7 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
new MemoryControlTsFileIOWriter(testFile, 1024 * 1024 * 10, true)) {
List<IChunkMetadata> originChunkMetadataList = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
- String deviceId = "root.sg.d" + i;
+ String deviceId = deviceDictInOrder.get(i);
writer.startChunkGroup(deviceId);
if (i % 2 == 0) {
// write normal series
@@ -192,20 +218,20 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
ChunkWriterImpl chunkWriter;
switch (j) {
case 0:
- chunkWriter = generateIntData(j);
+ chunkWriter = generateIntData(j, 0L);
break;
case 1:
- chunkWriter = generateBooleanData(j);
+ chunkWriter = generateBooleanData(j, 0L);
break;
case 2:
- chunkWriter = generateFloatData(j);
+ chunkWriter = generateFloatData(j, 0L);
break;
case 3:
- chunkWriter = generateDoubleData(j);
+ chunkWriter = generateDoubleData(j, 0L);
break;
case 4:
default:
- chunkWriter = generateTextData(j);
+ chunkWriter = generateTextData(j, 0L);
break;
}
chunkWriter.writeToFileWriter(writer);
@@ -213,7 +239,7 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
originChunkMetadataList.addAll(writer.chunkMetadataList);
} else {
// write vector
- AlignedChunkWriterImpl chunkWriter = generateVectorData(i);
+ AlignedChunkWriterImpl chunkWriter = generateVectorData(i, 0L);
chunkWriter.writeToFileWriter(writer);
originChunkMetadataList.add(
new AlignedChunkMetadata(
@@ -236,11 +262,13 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
Pair<String, IChunkMetadata> chunkMetadataPair =
window.getNextSeriesNameAndChunkMetadata();
if (originChunkMetadataList.get(i) instanceof ChunkMetadata) {
Assert.assertEquals(
- "root.sg.d" + deviceCnt + "." +
originChunkMetadataList.get(i).getMeasurementUid(),
+ deviceDictInOrder.get(deviceCnt)
+ + "."
+ + originChunkMetadataList.get(i).getMeasurementUid(),
chunkMetadataPair.left);
} else {
deviceCnt++;
- Assert.assertEquals("root.sg.d" + deviceCnt++,
chunkMetadataPair.left);
+ Assert.assertEquals(deviceDictInOrder.get(deviceCnt++),
chunkMetadataPair.left);
}
Assert.assertEquals(
originChunkMetadataList.get(i).getStartTime(),
chunkMetadataPair.right.getStartTime());
@@ -255,47 +283,554 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
}
}
- private ChunkWriterImpl generateIntData(int idx) {
+ /**
+ * Write a file with 10 devices and 5 series in each device. For each
series, we write one chunk
+ * for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithNormalChunk() throws IOException {
+ Map<String, Map<String, List<List<Long>>>> originTimes = new HashMap<>();
+ try (MemoryControlTsFileIOWriter writer =
+ new MemoryControlTsFileIOWriter(testFile, 1024, true)) {
+ List<IChunkMetadata> originChunkMetadataList = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = deviceDictInOrder.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ chunkWriter = generateIntData(j, 0L);
+ break;
+ case 1:
+ chunkWriter = generateBooleanData(j, 0L);
+ break;
+ case 2:
+ chunkWriter = generateFloatData(j, 0L);
+ break;
+ case 3:
+ chunkWriter = generateDoubleData(j, 0L);
+ break;
+ case 4:
+ default:
+ chunkWriter = generateTextData(j, 0L);
+ break;
+ }
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = 0; t < TEST_CHUNK_SIZE; ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ originChunkMetadataList.addAll(writer.chunkMetadataList);
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ }
+
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(),
originTimes);
+ }
+
+ /**
+ * Write a file with 10 devices and 5 series in each device. For each
series, we write 100 chunks
+ * for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithMultipleNormalChunk() throws
IOException {
+ Map<String, Map<String, List<List<Long>>>> originTimes = new HashMap<>();
+ try (MemoryControlTsFileIOWriter writer =
+ new MemoryControlTsFileIOWriter(testFile, 1024, true)) {
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = deviceDictInOrder.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ for (int k = 0; k < 10; ++k) {
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 10; ++k) {
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE *
k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 10; ++k) {
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 10; ++k) {
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE *
k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 10; ++k) {
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ }
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ }
+
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(),
originTimes);
+ }
+
+ /**
+ * Write a file with 10 devices and 5 series in each device. For each
series, we write 1024 chunks
+ * for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithEnormousNormalChunk() throws
IOException {
+ Map<String, Map<String, List<List<Long>>>> originTimes = new HashMap<>();
+ long originTestChunkSize = TEST_CHUNK_SIZE;
+ TEST_CHUNK_SIZE = 10;
+ try (MemoryControlTsFileIOWriter writer =
+ new MemoryControlTsFileIOWriter(testFile, 1024, true)) {
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = deviceDictInOrder.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j) {
+ case 0:
+ for (int k = 0; k < 1024; ++k) {
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 1024; ++k) {
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE *
k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 1024; ++k) {
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 1024; ++k) {
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE *
k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 1024; ++k) {
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ }
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ } finally {
+ TEST_CHUNK_SIZE = originTestChunkSize;
+ }
+
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(),
originTimes);
+ }
+
+ /**
+ * Write a file with 10 devices and 1024 series in each device. For each
series, we write 100
+ * chunks for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithEnormousSeriesNum() throws IOException {
+ Map<String, Map<String, List<List<Long>>>> originTimes = new HashMap<>();
+ long originTestChunkSize = TEST_CHUNK_SIZE;
+ TEST_CHUNK_SIZE = 10;
+ try (MemoryControlTsFileIOWriter writer =
+ new MemoryControlTsFileIOWriter(testFile, 1024, true)) {
+ for (int i = 0; i < 10; ++i) {
+ String deviceId = deviceDictInOrder.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 1024; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j % 5) {
+ case 0:
+ for (int k = 0; k < 100; ++k) {
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 100; ++k) {
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE *
k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 100; ++k) {
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 100; ++k) {
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE *
k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 100; ++k) {
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ }
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ } finally {
+ TEST_CHUNK_SIZE = originTestChunkSize;
+ }
+
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(),
originTimes);
+ }
+
+ /**
+ * Write a file with 1024 devices and 5 series in each device. For each
series, we write 10 chunks
+ * for it. This test make sure that each chunk
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException {
+ Map<String, Map<String, List<List<Long>>>> originTimes = new HashMap<>();
+ long originTestChunkSize = TEST_CHUNK_SIZE;
+ TEST_CHUNK_SIZE = 10;
+ try (MemoryControlTsFileIOWriter writer =
+ new MemoryControlTsFileIOWriter(testFile, 1024, true)) {
+ for (int i = 0; i < 1024; ++i) {
+ String deviceId = deviceDictInOrder.get(i);
+ writer.startChunkGroup(deviceId);
+ for (int j = 0; j < 5; ++j) {
+ ChunkWriterImpl chunkWriter;
+ switch (j % 5) {
+ case 0:
+ for (int k = 0; k < 10; ++k) {
+ chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 1:
+ for (int k = 0; k < 10; ++k) {
+ chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE *
k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 2:
+ for (int k = 0; k < 10; ++k) {
+ chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 3:
+ for (int k = 0; k < 10; ++k) {
+ chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE *
k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ case 4:
+ default:
+ for (int k = 0; k < 10; ++k) {
+ chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k);
+ chunkWriter.writeToFileWriter(writer);
+ List<Long> times = new ArrayList<>();
+ for (long t = (long) TEST_CHUNK_SIZE * k;
+ t < (long) TEST_CHUNK_SIZE * (k + 1);
+ ++t) {
+ times.add(t);
+ }
+ originTimes
+ .computeIfAbsent(deviceId, x -> new HashMap<>())
+ .computeIfAbsent(measurementDictInOrder.get(j), x -> new
ArrayList<>())
+ .add(times);
+ }
+ break;
+ }
+ }
+ writer.endChunkGroup();
+ }
+ Assert.assertTrue(writer.hasChunkMetadataInDisk);
+ writer.endFile();
+ } finally {
+ TEST_CHUNK_SIZE = originTestChunkSize;
+ }
+
TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath());
+ TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(),
originTimes);
+ }
+
+ private ChunkWriterImpl generateIntData(int idx, long startTime) {
ChunkWriterImpl chunkWriter =
- new ChunkWriterImpl(new MeasurementSchema("s" + idx,
TSDataType.INT64));
+ new ChunkWriterImpl(
+ new MeasurementSchema(measurementDictInOrder.get(idx),
TSDataType.INT64));
Random random = new Random();
- for (int i = 0; i < TEST_CHUNK_SIZE; ++i) {
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
chunkWriter.write(i, random.nextLong());
}
return chunkWriter;
}
- private ChunkWriterImpl generateFloatData(int idx) {
+ private ChunkWriterImpl generateFloatData(int idx, long startTime) {
ChunkWriterImpl chunkWriter =
- new ChunkWriterImpl(new MeasurementSchema("s" + idx,
TSDataType.FLOAT));
+ new ChunkWriterImpl(
+ new MeasurementSchema(measurementDictInOrder.get(idx),
TSDataType.FLOAT));
Random random = new Random();
- for (int i = 0; i < TEST_CHUNK_SIZE; ++i) {
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
chunkWriter.write(i, random.nextFloat());
}
return chunkWriter;
}
- private ChunkWriterImpl generateDoubleData(int idx) {
+ private ChunkWriterImpl generateDoubleData(int idx, long startTime) {
ChunkWriterImpl chunkWriter =
- new ChunkWriterImpl(new MeasurementSchema("s" + idx,
TSDataType.DOUBLE));
+ new ChunkWriterImpl(
+ new MeasurementSchema(measurementDictInOrder.get(idx),
TSDataType.DOUBLE));
Random random = new Random();
- for (int i = 0; i < TEST_CHUNK_SIZE; ++i) {
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
chunkWriter.write(i, random.nextDouble());
}
return chunkWriter;
}
- private ChunkWriterImpl generateBooleanData(int idx) {
+ private ChunkWriterImpl generateBooleanData(int idx, long startTime) {
ChunkWriterImpl chunkWriter =
- new ChunkWriterImpl(new MeasurementSchema("s" + idx,
TSDataType.BOOLEAN));
+ new ChunkWriterImpl(
+ new MeasurementSchema(measurementDictInOrder.get(idx),
TSDataType.BOOLEAN));
Random random = new Random();
- for (int i = 0; i < TEST_CHUNK_SIZE; ++i) {
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
chunkWriter.write(i, random.nextBoolean());
}
return chunkWriter;
}
- private AlignedChunkWriterImpl generateVectorData(int idx) {
+ private AlignedChunkWriterImpl generateVectorData(int idx, long startTime) {
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
measurementSchemas.add(new MeasurementSchema("", TSDataType.INT32));
measurementSchemas.add(new MeasurementSchema("", TSDataType.INT64));
@@ -305,7 +840,7 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
measurementSchemas.add(new MeasurementSchema("", TSDataType.TEXT));
AlignedChunkWriterImpl chunkWriter = new
AlignedChunkWriterImpl(measurementSchemas);
Random random = new Random();
- for (int i = 0; i < TEST_CHUNK_SIZE; ++i) {
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
TsPrimitiveType[] points = new TsPrimitiveType[6];
points[0] = new TsPrimitiveType.TsInt(random.nextInt());
points[1] = new TsPrimitiveType.TsLong(random.nextLong());
@@ -318,11 +853,12 @@ public class MemoryControlTsFileIOWriterTest extends
MemoryControlTsFileIOWriter
return chunkWriter;
}
- private ChunkWriterImpl generateTextData(int idx) {
+ private ChunkWriterImpl generateTextData(int idx, long startTime) {
ChunkWriterImpl chunkWriter =
- new ChunkWriterImpl(new MeasurementSchema("s" + idx, TSDataType.TEXT));
+ new ChunkWriterImpl(
+ new MeasurementSchema(measurementDictInOrder.get(idx),
TSDataType.TEXT));
Random random = new Random();
- for (int i = 0; i < TEST_CHUNK_SIZE; ++i) {
+ for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) {
chunkWriter.write(i, new Binary(String.valueOf(random.nextDouble())));
}
return chunkWriter;