This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3813
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-3813 by this push:
     new c7b002ce8b remove access to meta manager
c7b002ce8b is described below

commit c7b002ce8b4b50f10ab2aa321f19da97b39c6da2
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Jul 26 22:17:21 2022 +0800

    remove access to meta manager
---
 .../db/engine/compaction/CompactionUtils.java      | 136 ++++++++++++++++++---
 .../manage/CrossSpaceCompactionResource.java       |   6 -
 .../cross/rewrite/task/SubCompactionTask.java      |  19 ++-
 .../inner/utils/InnerSpaceCompactionUtils.java     |  11 +-
 .../utils/SingleSeriesCompactionExecutor.java      |  26 +++-
 5 files changed, 148 insertions(+), 50 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index c51d2dab98..839b67733c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.compaction;
 
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
 import 
org.apache.iotdb.db.engine.compaction.cross.rewrite.task.SubCompactionTask;
 import 
org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
 import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
@@ -33,24 +34,28 @@ import 
org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import org.slf4j.Logger;
@@ -61,6 +66,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -91,6 +97,7 @@ public class CompactionUtils {
     QueryResourceManager.getInstance()
         .getQueryFileManager()
         .addUsedFilesForQuery(queryId, queryDataSource);
+    Map<TsFileResource, TsFileSequenceReader> readerCacheMap = new HashMap<>();
 
     try (AbstractCompactionWriter compactionWriter =
         getCompactionWriter(seqFileResources, unseqFileResources, 
targetFileResources)) {
@@ -106,10 +113,20 @@ public class CompactionUtils {
 
         if (isAligned) {
           compactAlignedSeries(
-              device, deviceIterator, compactionWriter, queryContext, 
queryDataSource);
+              device,
+              deviceIterator,
+              compactionWriter,
+              queryContext,
+              queryDataSource,
+              readerCacheMap);
         } else {
           compactNonAlignedSeries(
-              device, deviceIterator, compactionWriter, queryContext, 
queryDataSource);
+              device,
+              deviceIterator,
+              compactionWriter,
+              queryContext,
+              queryDataSource,
+              readerCacheMap);
         }
       }
 
@@ -117,6 +134,7 @@ public class CompactionUtils {
       updateDeviceStartTimeAndEndTime(targetFileResources, compactionWriter);
       updatePlanIndexes(targetFileResources, seqFileResources, 
unseqFileResources);
     } finally {
+      clearReaderCache(readerCacheMap);
       QueryResourceManager.getInstance().endQuery(queryId);
     }
   }
@@ -126,21 +144,20 @@ public class CompactionUtils {
       MultiTsFileDeviceIterator deviceIterator,
       AbstractCompactionWriter compactionWriter,
       QueryContext queryContext,
-      QueryDataSource queryDataSource)
+      QueryDataSource queryDataSource,
+      Map<TsFileResource, TsFileSequenceReader> readerCacheMap)
       throws IOException, MetadataException {
     MultiTsFileDeviceIterator.AlignedMeasurementIterator 
alignedMeasurementIterator =
         deviceIterator.iterateAlignedSeries(device);
     Set<String> allMeasurements = 
alignedMeasurementIterator.getAllMeasurements();
-    List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
-    for (String measurement : allMeasurements) {
-      // TODO: use IDTable
-      try {
-        measurementSchemas.add(
-            IoTDB.metaManager.getSeriesSchema(new PartialPath(device, 
measurement)));
-      } catch (PathNotExistException e) {
-        logger.info("A deleted path is skipped: {}", e.getMessage());
-      }
-    }
+    Map<String, MeasurementSchema> schemaMap =
+        getMeasurementSchema(
+            device,
+            allMeasurements,
+            queryDataSource.getSeqResources(),
+            queryDataSource.getUnseqResources(),
+            readerCacheMap);
+    List<IMeasurementSchema> measurementSchemas = new 
ArrayList<>(schemaMap.values());
     if (measurementSchemas.isEmpty()) {
       return;
     }
@@ -173,12 +190,20 @@ public class CompactionUtils {
       MultiTsFileDeviceIterator deviceIterator,
       AbstractCompactionWriter compactionWriter,
       QueryContext queryContext,
-      QueryDataSource queryDataSource)
-      throws IOException, InterruptedException {
+      QueryDataSource queryDataSource,
+      Map<TsFileResource, TsFileSequenceReader> readerCacheMap)
+      throws IOException, InterruptedException, IllegalPathException {
     MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
         deviceIterator.iterateNotAlignedSeries(device, false);
     Set<String> allMeasurements = measurementIterator.getAllMeasurements();
     int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
+    Map<String, MeasurementSchema> schemaMap =
+        getMeasurementSchema(
+            device,
+            allMeasurements,
+            queryDataSource.getSeqResources(),
+            queryDataSource.getUnseqResources(),
+            readerCacheMap);
 
     // assign all measurements to different sub tasks
     Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums];
@@ -203,6 +228,7 @@ public class CompactionUtils {
                       queryContext,
                       queryDataSource,
                       compactionWriter,
+                      schemaMap,
                       i)));
     }
 
@@ -220,6 +246,60 @@ public class CompactionUtils {
     compactionWriter.endChunkGroup();
   }
 
+  private static Map<String, MeasurementSchema> getMeasurementSchema(
+      String device,
+      Set<String> measurements,
+      List<TsFileResource> seqFiles,
+      List<TsFileResource> unseqFiles,
+      Map<TsFileResource, TsFileSequenceReader> readerCacheMap)
+      throws IllegalPathException, IOException {
+    HashMap<String, MeasurementSchema> schemaMap = new HashMap<>();
+    List<TsFileResource> allResources = new LinkedList<>(seqFiles);
+    allResources.addAll(unseqFiles);
+    // sort the tsfile by version, so that we can iterate the tsfile from the 
newest to oldest
+    allResources.sort(
+        (o1, o2) -> {
+          try {
+            TsFileNameGenerator.TsFileName n1 =
+                TsFileNameGenerator.getTsFileName(o1.getTsFile().getName());
+            TsFileNameGenerator.TsFileName n2 =
+                TsFileNameGenerator.getTsFileName(o2.getTsFile().getName());
+            return (int) (n2.getVersion() - n1.getVersion());
+          } catch (IOException e) {
+            return 0;
+          }
+        });
+    for (String measurement : measurements) {
+      for (TsFileResource tsFileResource : allResources) {
+        if (!tsFileResource.mayContainsDevice(device)) {
+          continue;
+        }
+        MeasurementSchema schema =
+            getMeasurementSchemaFromReader(
+                tsFileResource,
+                readerCacheMap.computeIfAbsent(
+                    tsFileResource,
+                    x -> {
+                      try {
+                        
FileReaderManager.getInstance().increaseFileReaderReference(x, true);
+                        return 
FileReaderManager.getInstance().get(x.getTsFilePath(), true);
+                      } catch (IOException e) {
+                        throw new RuntimeException(
+                            String.format(
+                                "Failed to construct sequence reader for %s", 
tsFileResource));
+                      }
+                    }),
+                device,
+                measurement);
+        if (schema != null) {
+          schemaMap.put(measurement, schema);
+          break;
+        }
+      }
+    }
+    return schemaMap;
+  }
+
   public static void writeWithReader(
       AbstractCompactionWriter writer, IBatchReader reader, int subTaskId) 
throws IOException {
     while (reader.hasNextBatch()) {
@@ -231,6 +311,21 @@ public class CompactionUtils {
     }
   }
 
+  private static MeasurementSchema getMeasurementSchemaFromReader(
+      TsFileResource resource, TsFileSequenceReader reader, String device, 
String measurement)
+      throws IllegalPathException, IOException {
+    List<ChunkMetadata> chunkMetadata =
+        reader.getChunkMetadataList(new PartialPath(device, measurement));
+    if (chunkMetadata.size() > 0) {
+      chunkMetadata.get(0).setFilePath(resource.getTsFilePath());
+      Chunk chunk = ChunkCache.getInstance().get(chunkMetadata.get(0));
+      ChunkHeader header = chunk.getHeader();
+      return new MeasurementSchema(
+          measurement, header.getDataType(), header.getEncodingType(), 
header.getCompressionType());
+    }
+    return null;
+  }
+
   /**
    * @param measurementIds if device is aligned, then measurementIds contain 
all measurements. If
    *     device is not aligned, then measurementIds only contain one 
measurement.
@@ -433,4 +528,11 @@ public class CompactionUtils {
               "[Compaction] compaction for target file %s abort", 
tsFileResource.toString()));
     }
   }
+
+  private static void clearReaderCache(Map<TsFileResource, 
TsFileSequenceReader> readerCacheMap)
+      throws IOException {
+    for (TsFileResource resource : readerCacheMap.keySet()) {
+      FileReaderManager.getInstance().decreaseFileReaderReference(resource, 
true);
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
index a19fef77f6..7ced2fbcfc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
@@ -21,9 +21,7 @@ package 
org.apache.iotdb.db.engine.compaction.cross.rewrite.manage;
 
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
@@ -111,10 +109,6 @@ public class CrossSpaceCompactionResource {
     chunkWriterCache.clear();
   }
 
-  public IMeasurementSchema getSchema(PartialPath path) throws 
MetadataException {
-    return IoTDB.metaManager.getSeriesSchema(path);
-  }
-
   /**
    * Construct the a new or get an existing TsFileSequenceReader of a TsFile.
    *
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
index 08f63c6e8b..f95fec9033 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java
@@ -22,19 +22,17 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.compaction.CompactionUtils;
 import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
@@ -51,6 +49,7 @@ public class SubCompactionTask implements Callable<Void> {
   private final QueryContext queryContext;
   private final QueryDataSource queryDataSource;
   private final AbstractCompactionWriter compactionWriter;
+  private final Map<String, MeasurementSchema> schemaMap;
   private final int taskId;
 
   public SubCompactionTask(
@@ -59,26 +58,22 @@ public class SubCompactionTask implements Callable<Void> {
       QueryContext queryContext,
       QueryDataSource queryDataSource,
       AbstractCompactionWriter compactionWriter,
+      Map<String, MeasurementSchema> schemaMap,
       int taskId) {
     this.device = device;
     this.measurementList = measurementList;
     this.queryContext = queryContext;
     this.queryDataSource = queryDataSource;
     this.compactionWriter = compactionWriter;
+    this.schemaMap = schemaMap;
     this.taskId = taskId;
   }
 
   @Override
   public Void call() throws Exception {
     for (String measurement : measurementList) {
-      List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
-      try {
-        measurementSchemas.add(
-            IoTDB.metaManager.getSeriesSchema(new PartialPath(device, 
measurement)));
-      } catch (PathNotExistException e) {
-        logger.info("A deleted path is skipped: {}", e.getMessage());
-        continue;
-      }
+      List<IMeasurementSchema> measurementSchemas =
+          Collections.singletonList(schemaMap.get(measurement));
 
       IBatchReader dataBatchReader =
           CompactionUtils.constructReader(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index cf402099c7..125031ad1f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -31,10 +31,8 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -119,15 +117,8 @@ public class InnerSpaceCompactionUtils {
       // dead-loop.
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList =
           seriesIterator.getMetadataListForCurrentSeries();
-      try {
-        measurementSchema = IoTDB.metaManager.getSeriesSchema(p);
-      } catch (PathNotExistException e) {
-        logger.info("A deleted path is skipped: {}", e.getMessage());
-        continue;
-      }
       SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
-          new SingleSeriesCompactionExecutor(
-              p, measurementSchema, readerAndChunkMetadataList, writer, 
targetResource);
+          new SingleSeriesCompactionExecutor(p, readerAndChunkMetadataList, 
writer, targetResource);
       compactionExecutorOfCurrentTimeSeries.execute();
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index cf9a04f41d..2d49094f44 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.db.engine.compaction.constant.ProcessChunkType;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -36,6 +37,7 @@ import 
org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import com.google.common.util.concurrent.RateLimiter;
@@ -46,12 +48,12 @@ import java.util.List;
 
 /** This class is used to compact one series during inner space compaction. */
 public class SingleSeriesCompactionExecutor {
+  private PartialPath series;
+  private IMeasurementSchema schema;
   private String device;
   private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList;
   private TsFileIOWriter fileWriter;
   private TsFileResource targetResource;
-
-  private IMeasurementSchema schema;
   private ChunkWriterImpl chunkWriter;
   private Chunk cachedChunk;
   private ChunkMetadata cachedChunkMetadata;
@@ -75,15 +77,14 @@ public class SingleSeriesCompactionExecutor {
 
   public SingleSeriesCompactionExecutor(
       PartialPath series,
-      IMeasurementSchema measurementSchema,
       LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList,
       TsFileIOWriter fileWriter,
       TsFileResource targetResource) {
     this.device = series.getDevice();
     this.readerAndChunkMetadataList = readerAndChunkMetadataList;
     this.fileWriter = fileWriter;
-    this.schema = measurementSchema;
-    this.chunkWriter = new ChunkWriterImpl(this.schema);
+    this.series = series;
+    this.chunkWriter = null;
     this.cachedChunk = null;
     this.cachedChunkMetadata = null;
     this.targetResource = targetResource;
@@ -101,6 +102,10 @@ public class SingleSeriesCompactionExecutor {
       List<ChunkMetadata> chunkMetadataList = readerListPair.right;
       for (ChunkMetadata chunkMetadata : chunkMetadataList) {
         Chunk currentChunk = reader.readMemChunk(chunkMetadata);
+        if (chunkWriter == null) {
+          constructChunkWriterFromReadChunk(currentChunk);
+        }
+
         CompactionMetricsManager.recordReadInfo(
             currentChunk.getHeader().getSerializedSize() + 
currentChunk.getHeader().getDataSize());
 
@@ -135,6 +140,17 @@ public class SingleSeriesCompactionExecutor {
     targetResource.updateEndTime(device, maxEndTimestamp);
   }
 
+  private void constructChunkWriterFromReadChunk(Chunk chunk) {
+    ChunkHeader chunkHeader = chunk.getHeader();
+    this.schema =
+        new MeasurementSchema(
+            series.getMeasurement(),
+            chunkHeader.getDataType(),
+            chunkHeader.getEncodingType(),
+            chunkHeader.getCompressionType());
+    this.chunkWriter = new ChunkWriterImpl(this.schema);
+  }
+
   private long getChunkSize(Chunk chunk) {
     return chunk.getHeader().getSerializedSize() + 
chunk.getHeader().getDataSize();
   }

Reply via email to