This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b4fbb1c699 Avoid modification causing OOM while there is much 
deletion entry in mods file
7b4fbb1c699 is described below

commit 7b4fbb1c699c092a965e42ada932f70a8a854ceb
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Oct 26 16:09:39 2023 +0800

    Avoid modification causing OOM while there is much deletion entry in mods 
file
---
 .../execution/fragment/QueryContext.java           | 34 +++++++---------------
 .../schemaregion/utils/ResourceByPathUtils.java    | 10 +++++--
 .../metadata/DiskAlignedChunkMetadataLoader.java   | 14 ++++-----
 .../chunk/metadata/DiskChunkMetadataLoader.java    | 13 +++++----
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 27 ++++++++++-------
 5 files changed, 49 insertions(+), 49 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
index 60ee9baeaac..3b9b6046574 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -33,17 +33,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 /** QueryContext contains the shared information with in a query. */
 public class QueryContext {
 
-  /**
-   * The outer key is the path of a ModificationFile, the inner key in the 
name of a timeseries and
-   * the value is the Modifications of a timeseries in this file.
-   */
-  private final Map<String, Map<String, List<Modification>>> filePathModCache =
-      new ConcurrentHashMap<>();
   /**
    * The key is the path of a ModificationFile and the value is all 
Modifications in this file. We
    * use this field because each call of Modification.getModifications() 
return a copy of the
@@ -86,22 +79,17 @@ public class QueryContext {
     if (!modFile.exists()) {
       return Collections.emptyList();
     }
-    Map<String, List<Modification>> fileModifications =
-        filePathModCache.computeIfAbsent(modFile.getFilePath(), k -> new 
ConcurrentHashMap<>());
-    return fileModifications.computeIfAbsent(
-        path.getFullPath(),
-        k -> {
-          PatternTreeMap<Modification, ModsSerializer> allModifications =
-              fileModCache.get(modFile.getFilePath());
-          if (allModifications == null) {
-            allModifications = PatternTreeMapFactory.getModsPatternTreeMap();
-            for (Modification modification : modFile.getModificationsIter()) {
-              allModifications.append(modification.getPath(), modification);
-            }
-            fileModCache.put(modFile.getFilePath(), allModifications);
-          }
-          return 
ModificationFile.sortAndMerge(allModifications.getOverlapped(path));
-        });
+
+    PatternTreeMap<Modification, ModsSerializer> allModifications =
+        fileModCache.get(modFile.getFilePath());
+    if (allModifications == null) {
+      allModifications = PatternTreeMapFactory.getModsPatternTreeMap();
+      for (Modification modification : modFile.getModificationsIter()) {
+        allModifications.append(modification.getPath(), modification);
+      }
+      fileModCache.put(modFile.getFilePath(), allModifications);
+    }
+    return ModificationFile.sortAndMerge(allModifications.getOverlapped(path));
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
index f17c3f90da5..b041ef0148d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
@@ -140,8 +140,10 @@ class AlignedResourceByPathUtils extends 
ResourceByPathUtils {
     }
 
     boolean[] exist = new boolean[partialPath.getSchemaList().size()];
+    boolean modified = false;
     for (IChunkMetadata chunkMetadata : chunkMetadataList) {
       AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata) 
chunkMetadata;
+      modified = (modified || alignedChunkMetadata.isModified());
       
timeStatistics.mergeStatistics(alignedChunkMetadata.getTimeChunkMetadata().getStatistics());
       for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) {
         if (alignedChunkMetadata.getValueChunkMetadataList().get(i) != null) {
@@ -172,7 +174,9 @@ class AlignedResourceByPathUtils extends 
ResourceByPathUtils {
         }
       }
     }
+
     timeTimeSeriesMetadata.setStatistics(timeStatistics);
+    timeTimeSeriesMetadata.setModified(modified);
 
     for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) {
       if (!exist[i]) {
@@ -318,8 +322,7 @@ class MeasurementResourceByPathUtils extends 
ResourceByPathUtils {
    */
   @Override
   public ITimeSeriesMetadata generateTimeSeriesMetadata(
-      List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> 
chunkMetadataList)
-      throws IOException {
+      List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> 
chunkMetadataList) {
     TimeseriesMetadata timeSeriesMetadata = new TimeseriesMetadata();
     
timeSeriesMetadata.setMeasurementId(partialPath.getMeasurementSchema().getMeasurementId());
     
timeSeriesMetadata.setTsDataType(partialPath.getMeasurementSchema().getType());
@@ -329,7 +332,9 @@ class MeasurementResourceByPathUtils extends 
ResourceByPathUtils {
     Statistics<? extends Serializable> seriesStatistics =
         Statistics.getStatsByType(timeSeriesMetadata.getTsDataType());
     // flush chunkMetadataList one by one
+    boolean isModified = false;
     for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+      isModified = (isModified || chunkMetadata.isModified());
       seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
     }
 
@@ -339,6 +344,7 @@ class MeasurementResourceByPathUtils extends 
ResourceByPathUtils {
       }
     }
     timeSeriesMetadata.setStatistics(seriesStatistics);
+    timeSeriesMetadata.setModified(isModified);
     return timeSeriesMetadata;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java
index 95dbfd6e4e2..9dfbf5a897e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java
@@ -19,7 +19,6 @@
 
 package 
org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.metadata;
 
-import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
@@ -46,7 +45,6 @@ import static 
org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.LOA
 public class DiskAlignedChunkMetadataLoader implements IChunkMetadataLoader {
 
   private final TsFileResource resource;
-  private final AlignedPath seriesPath;
   private final QueryContext context;
   // time filter or value filter, only used to check time range
   private final Filter filter;
@@ -57,21 +55,24 @@ public class DiskAlignedChunkMetadataLoader implements 
IChunkMetadataLoader {
   // it's only exact while using limit & offset push down
   private final boolean queryAllSensors;
 
+  // all sub sensors' modifications
+  private final List<List<Modification>> pathModifications;
+
   private static final Logger DEBUG_LOGGER = 
LoggerFactory.getLogger("QUERY_DEBUG");
   private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET =
       SeriesScanCostMetricSet.getInstance();
 
   public DiskAlignedChunkMetadataLoader(
       TsFileResource resource,
-      AlignedPath seriesPath,
       QueryContext context,
       Filter filter,
-      boolean queryAllSensors) {
+      boolean queryAllSensors,
+      List<List<Modification>> pathModifications) {
     this.resource = resource;
-    this.seriesPath = seriesPath;
     this.context = context;
     this.filter = filter;
     this.queryAllSensors = queryAllSensors;
+    this.pathModifications = pathModifications;
   }
 
   @Override
@@ -82,9 +83,6 @@ public class DiskAlignedChunkMetadataLoader implements 
IChunkMetadataLoader {
           ((AlignedTimeSeriesMetadata) 
timeSeriesMetadata).getCopiedChunkMetadataList();
 
       final long t2 = System.nanoTime();
-      // get all sub sensors' modifications
-      List<List<Modification>> pathModifications =
-          context.getPathModifications(resource.getModFile(), seriesPath);
 
       if (context.isDebug()) {
         DEBUG_LOGGER.info(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
index 1a431895d1d..b33a7e6fcc6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
@@ -19,7 +19,6 @@
 
 package 
org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.metadata;
 
-import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
@@ -44,21 +43,25 @@ import static 
org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.LOA
 public class DiskChunkMetadataLoader implements IChunkMetadataLoader {
 
   private final TsFileResource resource;
-  private final PartialPath seriesPath;
   private final QueryContext context;
   // time filter or value filter, only used to check time range
   private final Filter filter;
 
+  private final List<Modification> pathModifications;
+
   private static final Logger DEBUG_LOGGER = 
LoggerFactory.getLogger("QUERY_DEBUG");
   private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET =
       SeriesScanCostMetricSet.getInstance();
 
   public DiskChunkMetadataLoader(
-      TsFileResource resource, PartialPath seriesPath, QueryContext context, 
Filter filter) {
+      TsFileResource resource,
+      QueryContext context,
+      Filter filter,
+      List<Modification> pathModifications) {
     this.resource = resource;
-    this.seriesPath = seriesPath;
     this.context = context;
     this.filter = filter;
+    this.pathModifications = pathModifications;
   }
 
   @Override
@@ -69,8 +72,6 @@ public class DiskChunkMetadataLoader implements 
IChunkMetadataLoader {
           ((TimeseriesMetadata) 
timeSeriesMetadata).getCopiedChunkMetadataList();
 
       final long t2 = System.nanoTime();
-      List<Modification> pathModifications =
-          context.getPathModifications(resource.getModFile(), seriesPath);
 
       if (context.isDebug()) {
         DEBUG_LOGGER.info(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index eccb532e8df..fe25fd0688a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -149,8 +149,11 @@ public class FileLoaderUtils {
                     resource.getTimeIndexType() != 1,
                     context.isDebug());
         if (timeSeriesMetadata != null) {
+          List<Modification> pathModifications =
+              context.getPathModifications(resource.getModFile(), seriesPath);
+          timeSeriesMetadata.setModified(!pathModifications.isEmpty());
           timeSeriesMetadata.setChunkMetadataLoader(
-              new DiskChunkMetadataLoader(resource, seriesPath, context, 
filter));
+              new DiskChunkMetadataLoader(resource, context, filter, 
pathModifications));
         }
       } else { // if the tsfile is unclosed, we just get it directly from 
TsFileResource
         loadFromMem = true;
@@ -165,9 +168,6 @@ public class FileLoaderUtils {
       if (timeSeriesMetadata != null) {
         long t2 = System.nanoTime();
         try {
-          List<Modification> pathModifications =
-              context.getPathModifications(resource.getModFile(), seriesPath);
-          timeSeriesMetadata.setModified(!pathModifications.isEmpty());
           if (timeSeriesMetadata.getStatistics().getStartTime()
               > timeSeriesMetadata.getStatistics().getEndTime()) {
             return null;
@@ -224,6 +224,7 @@ public class FileLoaderUtils {
           alignedTimeSeriesMetadata.setChunkMetadataLoader(
               new MemAlignedChunkMetadataLoader(
                   resource, alignedPath, context, filter, queryAllSensors));
+          // mem's modification already done in generating chunkmetadata
         }
       }
 
@@ -240,9 +241,6 @@ public class FileLoaderUtils {
                   
alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime()))
 {
             return null;
           }
-
-          // set modifications to each aligned path
-          setModifications(resource, alignedTimeSeriesMetadata, alignedPath, 
context);
         } finally {
           SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
               TIMESERIES_METADATA_MODIFICATION_ALIGNED, System.nanoTime() - 
t2);
@@ -292,7 +290,7 @@ public class FileLoaderUtils {
             new AlignedTimeSeriesMetadata(timeColumn, Collections.emptyList());
         alignedTimeSeriesMetadata.setChunkMetadataLoader(
             new DiskAlignedChunkMetadataLoader(
-                resource, alignedPath, context, filter, queryAllSensors));
+                resource, context, filter, queryAllSensors, 
Collections.emptyList()));
       } else {
         List<TimeseriesMetadata> valueTimeSeriesMetadataList =
             new ArrayList<>(valueMeasurementList.size());
@@ -309,24 +307,29 @@ public class FileLoaderUtils {
           valueTimeSeriesMetadataList.add(valueColumn);
         }
         if (exist) {
+          // set modifications to each aligned path
           alignedTimeSeriesMetadata =
               new AlignedTimeSeriesMetadata(timeColumn, 
valueTimeSeriesMetadataList);
+          List<List<Modification>> pathModifications =
+              setModifications(resource, alignedTimeSeriesMetadata, 
alignedPath, context);
+
           alignedTimeSeriesMetadata.setChunkMetadataLoader(
               new DiskAlignedChunkMetadataLoader(
-                  resource, alignedPath, context, filter, queryAllSensors));
+                  resource, context, filter, queryAllSensors, 
pathModifications));
         }
       }
     }
     return alignedTimeSeriesMetadata;
   }
 
-  private static void setModifications(
+  private static List<List<Modification>> setModifications(
       TsFileResource resource,
       AlignedTimeSeriesMetadata alignedTimeSeriesMetadata,
       AlignedPath alignedPath,
       QueryContext context) {
     List<TimeseriesMetadata> valueTimeSeriesMetadataList =
         alignedTimeSeriesMetadata.getValueTimeseriesMetadataList();
+    List<List<Modification>> res = new ArrayList<>();
     boolean modified = false;
     for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) {
       if (valueTimeSeriesMetadataList.get(i) != null) {
@@ -334,10 +337,14 @@ public class FileLoaderUtils {
             context.getPathModifications(
                 resource.getModFile(), alignedPath.getPathWithMeasurement(i));
         
valueTimeSeriesMetadataList.get(i).setModified(!pathModifications.isEmpty());
+        res.add(pathModifications);
         modified = (modified || !pathModifications.isEmpty());
+      } else {
+        res.add(Collections.emptyList());
       }
     }
     alignedTimeSeriesMetadata.getTimeseriesMetadata().setModified(modified);
+    return res;
   }
 
   /**

Reply via email to