This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4780
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6b61cda09e8ddb6fcf1b9896335be4365cab6cfb
Author: LiuXuxin <[email protected]>
AuthorDate: Thu Oct 27 21:48:13 2022 +0800

    temp
---
 .../java/org/apache/iotdb/RewriteTsFileTool.java   | 94 +++++++++++++++++++++-
 1 file changed, 91 insertions(+), 3 deletions(-)

diff --git 
a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java 
b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
index f403f60bfd..d3851bccdb 100644
--- a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
+++ b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -97,6 +98,10 @@ public class RewriteTsFileTool {
   private static String filePath = "";
   private static String readMode = "s";
   private static boolean ignoreBrokenChunk = false;
+  private static boolean writeAsAligned = true;
+
+  private static Map<String, MeasurementSchema> schemaMap = new HashMap<>();
+  private static Map<Long, Map<String, TsPrimitiveType>> timeValuePairMap = 
new HashMap<>();
 
   public static void main(String[] args) {
     Session session = null;
@@ -693,6 +698,60 @@ public class RewriteTsFileTool {
             while (seriesIterator.hasNextSeries()) {
               writeSingleSeries(device, seriesIterator, session);
             }
+            if (writeAsAligned) {
+              List<MeasurementSchema> schemas = new 
ArrayList<>(schemaMap.values());
+              Tablet tablet = new Tablet(device, schemas, MAX_TABLET_LENGTH);
+              for (Map.Entry<Long, Map<String, TsPrimitiveType>> entry :
+                  timeValuePairMap.entrySet()) {
+                tablet.addTimestamp(tablet.rowSize, entry.getKey());
+                Map<String, TsPrimitiveType> valueMap = entry.getValue();
+                for (MeasurementSchema schema : schemas) {
+                  TsPrimitiveType tsPrimitiveType =
+                      valueMap.getOrDefault(schema.getMeasurementId(), null);
+                  if (tsPrimitiveType != null) {
+                    switch (tsPrimitiveType.getDataType()) {
+                      case BOOLEAN:
+                        tablet.addValue(
+                            schema.getMeasurementId(),
+                            tablet.rowSize,
+                            tsPrimitiveType.getBoolean());
+                        break;
+                      case INT32:
+                        tablet.addValue(
+                            schema.getMeasurementId(), tablet.rowSize, 
tsPrimitiveType.getInt());
+                        break;
+                      case INT64:
+                        tablet.addValue(
+                            schema.getMeasurementId(), tablet.rowSize, 
tsPrimitiveType.getLong());
+                        break;
+                      case FLOAT:
+                        tablet.addValue(
+                            schema.getMeasurementId(), tablet.rowSize, 
tsPrimitiveType.getFloat());
+                        break;
+                      case DOUBLE:
+                        tablet.addValue(
+                            schema.getMeasurementId(), tablet.rowSize, 
tsPrimitiveType.getDouble());
+                        break;
+                      case TEXT:
+                        tablet.addValue(
+                            schema.getMeasurementId(),
+                            tablet.rowSize,
+                            tsPrimitiveType.getStringValue());
+                        break;
+                    }
+                  }
+                }
+                tablet.rowSize++;
+                if (tablet.rowSize > MAX_TABLET_LENGTH) {
+                  session.insertAlignedTablet(tablet);
+                  tablet.reset();
+                }
+              }
+              if (tablet.rowSize > 0) {
+                session.insertAlignedTablet(tablet);
+                tablet.reset();
+              }
+            }
           }
         }
       }
@@ -704,9 +763,10 @@ public class RewriteTsFileTool {
   protected static void writeSingleSeries(
       String device, MultiTsFileDeviceIterator.MeasurementIterator 
seriesIterator, Session session)
       throws IllegalPathException {
-    PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
+    //    PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
     LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList =
         seriesIterator.getMetadataListForCurrentSeries();
+    String series = seriesIterator.nextSeries();
     while (!readerAndChunkMetadataList.isEmpty()) {
       Pair<TsFileSequenceReader, List<ChunkMetadata>> readerMetadataPair =
           readerAndChunkMetadataList.removeFirst();
@@ -714,16 +774,44 @@ public class RewriteTsFileTool {
       List<ChunkMetadata> chunkMetadataList = readerMetadataPair.right;
       for (ChunkMetadata chunkMetadata : chunkMetadataList) {
         try {
-          writeSingleChunk(device, p, chunkMetadata, reader, session);
+          //          if (!writeAsAligned) {
+          //            writeSingleChunk(device, p, chunkMetadata, reader, 
session);
+          //          } else {
+          cacheForAligned(device, series, chunkMetadata, reader);
+          //          }
         } catch (Throwable t) {
           // this is a broken chunk, skip it
           t.printStackTrace();
-          System.out.printf("Skip broken chunk in device %s.%s%n", device, 
p.getMeasurement());
+          System.out.printf("Skip broken chunk in device %s.%s%n", device, 
series);
         }
       }
     }
   }
 
+  protected static void cacheForAligned(
+      String device, String measurement, ChunkMetadata chunkMetadata, 
TsFileSequenceReader reader)
+      throws IOException, IoTDBConnectionException, 
StatementExecutionException {
+    Chunk chunk = reader.readMemChunk(chunkMetadata);
+    ChunkHeader chunkHeader = chunk.getHeader();
+    MeasurementSchema schema =
+        new MeasurementSchema(
+            measurement,
+            chunkHeader.getDataType(),
+            chunkHeader.getEncodingType(),
+            CompressionType.GZIP);
+    schemaMap.computeIfAbsent(measurement, x -> schema);
+    IChunkReader chunkReader = new ChunkReader(chunk, null);
+    while (chunkReader.hasNextSatisfiedPage()) {
+      IPointReader batchIterator = 
chunkReader.nextPageData().getBatchDataIterator();
+      while (batchIterator.hasNextTimeValuePair()) {
+        TimeValuePair timeValuePair = batchIterator.nextTimeValuePair();
+        timeValuePairMap
+            .computeIfAbsent(timeValuePair.getTimestamp(), x -> new 
HashMap<>())
+            .put(measurement, timeValuePair.getValue());
+      }
+    }
+  }
+
   /** Read and write a single chunk for not aligned series. */
   protected static void writeSingleChunk(
       String device,

Reply via email to