This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch debug-baowu
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0efd3230ca064d06be684097cdb072cc789034ea
Author: LiuXuxin <[email protected]>
AuthorDate: Thu Oct 27 22:13:01 2022 +0800

    temp
---
 .../inner/utils/InnerSpaceCompactionUtils.java     | 32 ++++++++++++++++++++--
 .../utils/SingleSeriesCompactionExecutor.java      | 28 ++++++++++++-------
 2 files changed, 47 insertions(+), 13 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index e21a5c3a57..16f319b3da 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -33,13 +33,14 @@ import 
org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.rescon.SystemInfo;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
@@ -50,8 +51,12 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 
 public class InnerSpaceCompactionUtils {
 
@@ -99,7 +104,7 @@ public class InnerSpaceCompactionUtils {
 
   private static void checkThreadInterrupted(TsFileResource tsFileResource)
       throws InterruptedException {
-    if (Thread.interrupted() || !IoTDB.activated) {
+    if (Thread.interrupted()) {
       throw new InterruptedException(
           String.format(
               "[Compaction] compaction for target file %s abort", 
tsFileResource.toString()));
@@ -114,6 +119,8 @@ public class InnerSpaceCompactionUtils {
       throws IOException, MetadataException, InterruptedException {
     MultiTsFileDeviceIterator.MeasurementIterator seriesIterator =
         deviceIterator.iterateNotAlignedSeries(device, true);
+    Map<Long, Map<String, TsPrimitiveType>> valueMap = new TreeMap<>();
+    Map<String, IMeasurementSchema> schemaMap = new HashMap<>();
     while (seriesIterator.hasNextSeries()) {
       checkThreadInterrupted(targetResource);
       // TODO: we can provide a configuration item to enable concurrent 
between each series
@@ -125,9 +132,21 @@ public class InnerSpaceCompactionUtils {
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList =
           seriesIterator.getMetadataListForCurrentSeries();
       SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
-          new SingleSeriesCompactionExecutor(p, readerAndChunkMetadataList, 
writer, targetResource);
+          new SingleSeriesCompactionExecutor(
+              p, readerAndChunkMetadataList, writer, targetResource, valueMap, 
schemaMap);
       compactionExecutorOfCurrentTimeSeries.execute();
     }
+    List<IMeasurementSchema> schemaList = new ArrayList<>(schemaMap.values());
+    AlignedChunkWriterImpl chunkWriter = new 
AlignedChunkWriterImpl(schemaList);
+    for (Map.Entry<Long, Map<String, TsPrimitiveType>> entry : 
valueMap.entrySet()) {
+      Map<String, TsPrimitiveType> vMap = entry.getValue();
+      TsPrimitiveType[] types = new TsPrimitiveType[schemaList.size()];
+      for (int i = 0; i < schemaList.size(); ++i) {
+        types[i] = vMap.get(schemaList.get(i).getMeasurementId());
+      }
+      chunkWriter.write(entry.getKey(), types);
+    }
+    chunkWriter.writeToFileWriter(writer);
     writer.checkMetadataSizeAndMayFlush();
   }
 
@@ -264,4 +283,11 @@ public class InnerSpaceCompactionUtils {
     targetResource.serialize();
     targetResource.close();
   }
+
+  public static void main(String[] args) throws Exception {
+    TsFileResource sourceResource =
+        new TsFileResource(new File("E:\\1664917514523-394-1-0.tsfile"));
+    TsFileResource targetResource = new TsFileResource(new 
File("E:\\Gzip-aligned.tsfile"));
+    InnerSpaceCompactionUtils.compact(targetResource, 
Collections.singletonList(sourceResource));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index 2d49094f44..32077d7c43 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
@@ -35,6 +36,7 @@ import org.apache.iotdb.tsfile.read.reader.IChunkReader;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -43,8 +45,10 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 import com.google.common.util.concurrent.RateLimiter;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 /** This class is used to compact one series during inner space compaction. */
 public class SingleSeriesCompactionExecutor {
@@ -75,11 +79,16 @@ public class SingleSeriesCompactionExecutor {
   private final boolean enableMetrics =
       MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric();
 
+  private Map<Long, Map<String, TsPrimitiveType>> valueMap;
+  private Map<String, IMeasurementSchema> schemaMap;
+
   public SingleSeriesCompactionExecutor(
       PartialPath series,
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList,
       TsFileIOWriter fileWriter,
-      TsFileResource targetResource) {
+      TsFileResource targetResource,
+      Map<Long, Map<String, TsPrimitiveType>> valueMap,
+      Map<String, IMeasurementSchema> schemaMap) {
     this.device = series.getDevice();
     this.readerAndChunkMetadataList = readerAndChunkMetadataList;
     this.fileWriter = fileWriter;
@@ -88,6 +97,8 @@ public class SingleSeriesCompactionExecutor {
     this.cachedChunk = null;
     this.cachedChunkMetadata = null;
     this.targetResource = targetResource;
+    this.valueMap = valueMap;
+    this.schemaMap = schemaMap;
   }
 
   /**
@@ -110,7 +121,7 @@ public class SingleSeriesCompactionExecutor {
             currentChunk.getHeader().getSerializedSize() + 
currentChunk.getHeader().getDataSize());
 
         // if this chunk is modified, deserialize it into points
-        if (chunkMetadata.getDeleteIntervalList() != null) {
+        if (chunkMetadata.getStartTime() > 0) {
           processModifiedChunk(currentChunk);
           continue;
         }
@@ -147,7 +158,8 @@ public class SingleSeriesCompactionExecutor {
             series.getMeasurement(),
             chunkHeader.getDataType(),
             chunkHeader.getEncodingType(),
-            chunkHeader.getCompressionType());
+            CompressionType.GZIP);
+    schemaMap.put(series.getMeasurement(), schema);
     this.chunkWriter = new ChunkWriterImpl(this.schema);
   }
 
@@ -220,13 +232,9 @@ public class SingleSeriesCompactionExecutor {
       IPointReader batchIterator = 
chunkReader.nextPageData().getBatchDataIterator();
       while (batchIterator.hasNextTimeValuePair()) {
         TimeValuePair timeValuePair = batchIterator.nextTimeValuePair();
-        writeTimeAndValueToChunkWriter(timeValuePair);
-        if (timeValuePair.getTimestamp() > maxEndTimestamp) {
-          maxEndTimestamp = timeValuePair.getTimestamp();
-        }
-        if (timeValuePair.getTimestamp() < minStartTimestamp) {
-          minStartTimestamp = timeValuePair.getTimestamp();
-        }
+        valueMap
+            .computeIfAbsent(timeValuePair.getTimestamp(), x -> new 
HashMap<>())
+            .put(series.getMeasurement(), timeValuePair.getValue());
       }
     }
     pointCountInChunkWriter += chunk.getChunkStatistic().getCount();

Reply via email to