This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-4780 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6b61cda09e8ddb6fcf1b9896335be4365cab6cfb Author: LiuXuxin <[email protected]> AuthorDate: Thu Oct 27 21:48:13 2022 +0800 temp --- .../java/org/apache/iotdb/RewriteTsFileTool.java | 94 +++++++++++++++++++++- 1 file changed, 91 insertions(+), 3 deletions(-) diff --git a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java index f403f60bfd..d3851bccdb 100644 --- a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java +++ b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java @@ -42,6 +42,7 @@ import org.apache.iotdb.tsfile.file.header.PageHeader; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; @@ -97,6 +98,10 @@ public class RewriteTsFileTool { private static String filePath = ""; private static String readMode = "s"; private static boolean ignoreBrokenChunk = false; + private static boolean writeAsAligned = true; + + private static Map<String, MeasurementSchema> schemaMap = new HashMap<>(); + private static Map<Long, Map<String, TsPrimitiveType>> timeValuePairMap = new HashMap<>(); public static void main(String[] args) { Session session = null; @@ -693,6 +698,60 @@ public class RewriteTsFileTool { while (seriesIterator.hasNextSeries()) { writeSingleSeries(device, seriesIterator, session); } + if (writeAsAligned) { + List<MeasurementSchema> schemas = new ArrayList<>(schemaMap.values()); + Tablet tablet = new Tablet(device, schemas, MAX_TABLET_LENGTH); + for (Map.Entry<Long, Map<String, TsPrimitiveType>> entry : + timeValuePairMap.entrySet()) { + tablet.addTimestamp(tablet.rowSize, entry.getKey()); + Map<String, TsPrimitiveType> valueMap = entry.getValue(); + for (MeasurementSchema schema : schemas) { + TsPrimitiveType tsPrimitiveType = + valueMap.getOrDefault(schema.getMeasurementId(), null); + if (tsPrimitiveType != null) { + switch (tsPrimitiveType.getDataType()) { + case BOOLEAN: + tablet.addValue( + schema.getMeasurementId(), + tablet.rowSize, + tsPrimitiveType.getBoolean()); + break; + case INT32: + tablet.addValue( + schema.getMeasurementId(), tablet.rowSize, tsPrimitiveType.getInt()); + break; + case INT64: + tablet.addValue( + schema.getMeasurementId(), tablet.rowSize, tsPrimitiveType.getLong()); + break; + case FLOAT: + tablet.addValue( + schema.getMeasurementId(), tablet.rowSize, tsPrimitiveType.getFloat()); + break; + case DOUBLE: + tablet.addValue( + schema.getMeasurementId(), tablet.rowSize, tsPrimitiveType.getDouble()); + break; + case TEXT: + tablet.addValue( + schema.getMeasurementId(), + tablet.rowSize, + tsPrimitiveType.getStringValue()); + break; + } + } + } + tablet.rowSize++; + if (tablet.rowSize > MAX_TABLET_LENGTH) { + session.insertAlignedTablet(tablet); + tablet.reset(); + } + } + if (tablet.rowSize > 0) { + session.insertAlignedTablet(tablet); + tablet.reset(); + } + } } } } @@ -704,9 +763,10 @@ public class RewriteTsFileTool { protected static void writeSingleSeries( String device, MultiTsFileDeviceIterator.MeasurementIterator seriesIterator, Session session) throws IllegalPathException { - PartialPath p = new PartialPath(device, seriesIterator.nextSeries()); + // PartialPath p = new PartialPath(device, seriesIterator.nextSeries()); LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList = seriesIterator.getMetadataListForCurrentSeries(); + String series = seriesIterator.nextSeries(); while (!readerAndChunkMetadataList.isEmpty()) { Pair<TsFileSequenceReader, List<ChunkMetadata>> readerMetadataPair = readerAndChunkMetadataList.removeFirst(); @@ -714,16 +774,44 @@ public class RewriteTsFileTool { List<ChunkMetadata> chunkMetadataList = readerMetadataPair.right; for (ChunkMetadata chunkMetadata : chunkMetadataList) { try { - writeSingleChunk(device, p, chunkMetadata, reader, session); + // if (!writeAsAligned) { + // writeSingleChunk(device, p, chunkMetadata, reader, session); + // } else { + cacheForAligned(device, series, chunkMetadata, reader); + // } } catch (Throwable t) { // this is a broken chunk, skip it t.printStackTrace(); - System.out.printf("Skip broken chunk in device %s.%s%n", device, p.getMeasurement()); + System.out.printf("Skip broken chunk in device %s.%s%n", device, series); } } } } + protected static void cacheForAligned( + String device, String measurement, ChunkMetadata chunkMetadata, TsFileSequenceReader reader) + throws IOException, IoTDBConnectionException, StatementExecutionException { + Chunk chunk = reader.readMemChunk(chunkMetadata); + ChunkHeader chunkHeader = chunk.getHeader(); + MeasurementSchema schema = + new MeasurementSchema( + measurement, + chunkHeader.getDataType(), + chunkHeader.getEncodingType(), + CompressionType.GZIP); + schemaMap.computeIfAbsent(measurement, x -> schema); + IChunkReader chunkReader = new ChunkReader(chunk, null); + while (chunkReader.hasNextSatisfiedPage()) { + IPointReader batchIterator = chunkReader.nextPageData().getBatchDataIterator(); + while (batchIterator.hasNextTimeValuePair()) { + TimeValuePair timeValuePair = batchIterator.nextTimeValuePair(); + timeValuePairMap + .computeIfAbsent(timeValuePair.getTimestamp(), x -> new HashMap<>()) + .put(measurement, timeValuePair.getValue()); + } + } + } + /** Read and write a single chunk for not aligned series. */ protected static void writeSingleChunk( String device,
