This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7b4fbb1c699 Avoid modification causing OOM while there is much
deletion entry in mods file
7b4fbb1c699 is described below
commit 7b4fbb1c699c092a965e42ada932f70a8a854ceb
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Oct 26 16:09:39 2023 +0800
Avoid modification causing OOM while there is much deletion entry in mods
file
---
.../execution/fragment/QueryContext.java | 34 +++++++---------------
.../schemaregion/utils/ResourceByPathUtils.java | 10 +++++--
.../metadata/DiskAlignedChunkMetadataLoader.java | 14 ++++-----
.../chunk/metadata/DiskChunkMetadataLoader.java | 13 +++++----
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 27 ++++++++++-------
5 files changed, 49 insertions(+), 49 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
index 60ee9baeaac..3b9b6046574 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -33,17 +33,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
/** QueryContext contains the shared information with in a query. */
public class QueryContext {
- /**
- * The outer key is the path of a ModificationFile, the inner key in the
name of a timeseries and
- * the value is the Modifications of a timeseries in this file.
- */
- private final Map<String, Map<String, List<Modification>>> filePathModCache =
- new ConcurrentHashMap<>();
/**
* The key is the path of a ModificationFile and the value is all
Modifications in this file. We
* use this field because each call of Modification.getModifications()
return a copy of the
@@ -86,22 +79,17 @@ public class QueryContext {
if (!modFile.exists()) {
return Collections.emptyList();
}
- Map<String, List<Modification>> fileModifications =
- filePathModCache.computeIfAbsent(modFile.getFilePath(), k -> new
ConcurrentHashMap<>());
- return fileModifications.computeIfAbsent(
- path.getFullPath(),
- k -> {
- PatternTreeMap<Modification, ModsSerializer> allModifications =
- fileModCache.get(modFile.getFilePath());
- if (allModifications == null) {
- allModifications = PatternTreeMapFactory.getModsPatternTreeMap();
- for (Modification modification : modFile.getModificationsIter()) {
- allModifications.append(modification.getPath(), modification);
- }
- fileModCache.put(modFile.getFilePath(), allModifications);
- }
- return
ModificationFile.sortAndMerge(allModifications.getOverlapped(path));
- });
+
+ PatternTreeMap<Modification, ModsSerializer> allModifications =
+ fileModCache.get(modFile.getFilePath());
+ if (allModifications == null) {
+ allModifications = PatternTreeMapFactory.getModsPatternTreeMap();
+ for (Modification modification : modFile.getModificationsIter()) {
+ allModifications.append(modification.getPath(), modification);
+ }
+ fileModCache.put(modFile.getFilePath(), allModifications);
+ }
+ return ModificationFile.sortAndMerge(allModifications.getOverlapped(path));
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
index f17c3f90da5..b041ef0148d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
@@ -140,8 +140,10 @@ class AlignedResourceByPathUtils extends
ResourceByPathUtils {
}
boolean[] exist = new boolean[partialPath.getSchemaList().size()];
+ boolean modified = false;
for (IChunkMetadata chunkMetadata : chunkMetadataList) {
AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata)
chunkMetadata;
+ modified = (modified || alignedChunkMetadata.isModified());
timeStatistics.mergeStatistics(alignedChunkMetadata.getTimeChunkMetadata().getStatistics());
for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) {
if (alignedChunkMetadata.getValueChunkMetadataList().get(i) != null) {
@@ -172,7 +174,9 @@ class AlignedResourceByPathUtils extends
ResourceByPathUtils {
}
}
}
+
timeTimeSeriesMetadata.setStatistics(timeStatistics);
+ timeTimeSeriesMetadata.setModified(modified);
for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) {
if (!exist[i]) {
@@ -318,8 +322,7 @@ class MeasurementResourceByPathUtils extends
ResourceByPathUtils {
*/
@Override
public ITimeSeriesMetadata generateTimeSeriesMetadata(
- List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata>
chunkMetadataList)
- throws IOException {
+ List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata>
chunkMetadataList) {
TimeseriesMetadata timeSeriesMetadata = new TimeseriesMetadata();
timeSeriesMetadata.setMeasurementId(partialPath.getMeasurementSchema().getMeasurementId());
timeSeriesMetadata.setTsDataType(partialPath.getMeasurementSchema().getType());
@@ -329,7 +332,9 @@ class MeasurementResourceByPathUtils extends
ResourceByPathUtils {
Statistics<? extends Serializable> seriesStatistics =
Statistics.getStatsByType(timeSeriesMetadata.getTsDataType());
// flush chunkMetadataList one by one
+ boolean isModified = false;
for (IChunkMetadata chunkMetadata : chunkMetadataList) {
+ isModified = (isModified || chunkMetadata.isModified());
seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
}
@@ -339,6 +344,7 @@ class MeasurementResourceByPathUtils extends
ResourceByPathUtils {
}
}
timeSeriesMetadata.setStatistics(seriesStatistics);
+ timeSeriesMetadata.setModified(isModified);
return timeSeriesMetadata;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java
index 95dbfd6e4e2..9dfbf5a897e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java
@@ -19,7 +19,6 @@
package
org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.metadata;
-import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
@@ -46,7 +45,6 @@ import static
org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.LOA
public class DiskAlignedChunkMetadataLoader implements IChunkMetadataLoader {
private final TsFileResource resource;
- private final AlignedPath seriesPath;
private final QueryContext context;
// time filter or value filter, only used to check time range
private final Filter filter;
@@ -57,21 +55,24 @@ public class DiskAlignedChunkMetadataLoader implements
IChunkMetadataLoader {
// it's only exact while using limit & offset push down
private final boolean queryAllSensors;
+ // all sub sensors' modifications
+ private final List<List<Modification>> pathModifications;
+
private static final Logger DEBUG_LOGGER =
LoggerFactory.getLogger("QUERY_DEBUG");
private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET =
SeriesScanCostMetricSet.getInstance();
public DiskAlignedChunkMetadataLoader(
TsFileResource resource,
- AlignedPath seriesPath,
QueryContext context,
Filter filter,
- boolean queryAllSensors) {
+ boolean queryAllSensors,
+ List<List<Modification>> pathModifications) {
this.resource = resource;
- this.seriesPath = seriesPath;
this.context = context;
this.filter = filter;
this.queryAllSensors = queryAllSensors;
+ this.pathModifications = pathModifications;
}
@Override
@@ -82,9 +83,6 @@ public class DiskAlignedChunkMetadataLoader implements
IChunkMetadataLoader {
((AlignedTimeSeriesMetadata)
timeSeriesMetadata).getCopiedChunkMetadataList();
final long t2 = System.nanoTime();
- // get all sub sensors' modifications
- List<List<Modification>> pathModifications =
- context.getPathModifications(resource.getModFile(), seriesPath);
if (context.isDebug()) {
DEBUG_LOGGER.info(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
index 1a431895d1d..b33a7e6fcc6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
@@ -19,7 +19,6 @@
package
org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.metadata;
-import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
@@ -44,21 +43,25 @@ import static
org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.LOA
public class DiskChunkMetadataLoader implements IChunkMetadataLoader {
private final TsFileResource resource;
- private final PartialPath seriesPath;
private final QueryContext context;
// time filter or value filter, only used to check time range
private final Filter filter;
+ private final List<Modification> pathModifications;
+
private static final Logger DEBUG_LOGGER =
LoggerFactory.getLogger("QUERY_DEBUG");
private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET =
SeriesScanCostMetricSet.getInstance();
public DiskChunkMetadataLoader(
- TsFileResource resource, PartialPath seriesPath, QueryContext context,
Filter filter) {
+ TsFileResource resource,
+ QueryContext context,
+ Filter filter,
+ List<Modification> pathModifications) {
this.resource = resource;
- this.seriesPath = seriesPath;
this.context = context;
this.filter = filter;
+ this.pathModifications = pathModifications;
}
@Override
@@ -69,8 +72,6 @@ public class DiskChunkMetadataLoader implements
IChunkMetadataLoader {
((TimeseriesMetadata)
timeSeriesMetadata).getCopiedChunkMetadataList();
final long t2 = System.nanoTime();
- List<Modification> pathModifications =
- context.getPathModifications(resource.getModFile(), seriesPath);
if (context.isDebug()) {
DEBUG_LOGGER.info(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index eccb532e8df..fe25fd0688a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -149,8 +149,11 @@ public class FileLoaderUtils {
resource.getTimeIndexType() != 1,
context.isDebug());
if (timeSeriesMetadata != null) {
+ List<Modification> pathModifications =
+ context.getPathModifications(resource.getModFile(), seriesPath);
+ timeSeriesMetadata.setModified(!pathModifications.isEmpty());
timeSeriesMetadata.setChunkMetadataLoader(
- new DiskChunkMetadataLoader(resource, seriesPath, context,
filter));
+ new DiskChunkMetadataLoader(resource, context, filter,
pathModifications));
}
} else { // if the tsfile is unclosed, we just get it directly from
TsFileResource
loadFromMem = true;
@@ -165,9 +168,6 @@ public class FileLoaderUtils {
if (timeSeriesMetadata != null) {
long t2 = System.nanoTime();
try {
- List<Modification> pathModifications =
- context.getPathModifications(resource.getModFile(), seriesPath);
- timeSeriesMetadata.setModified(!pathModifications.isEmpty());
if (timeSeriesMetadata.getStatistics().getStartTime()
> timeSeriesMetadata.getStatistics().getEndTime()) {
return null;
@@ -224,6 +224,7 @@ public class FileLoaderUtils {
alignedTimeSeriesMetadata.setChunkMetadataLoader(
new MemAlignedChunkMetadataLoader(
resource, alignedPath, context, filter, queryAllSensors));
+ // mem's modification already done in generating chunkmetadata
}
}
@@ -240,9 +241,6 @@ public class FileLoaderUtils {
alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime()))
{
return null;
}
-
- // set modifications to each aligned path
- setModifications(resource, alignedTimeSeriesMetadata, alignedPath,
context);
} finally {
SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
TIMESERIES_METADATA_MODIFICATION_ALIGNED, System.nanoTime() -
t2);
@@ -292,7 +290,7 @@ public class FileLoaderUtils {
new AlignedTimeSeriesMetadata(timeColumn, Collections.emptyList());
alignedTimeSeriesMetadata.setChunkMetadataLoader(
new DiskAlignedChunkMetadataLoader(
- resource, alignedPath, context, filter, queryAllSensors));
+ resource, context, filter, queryAllSensors,
Collections.emptyList()));
} else {
List<TimeseriesMetadata> valueTimeSeriesMetadataList =
new ArrayList<>(valueMeasurementList.size());
@@ -309,24 +307,29 @@ public class FileLoaderUtils {
valueTimeSeriesMetadataList.add(valueColumn);
}
if (exist) {
+ // set modifications to each aligned path
alignedTimeSeriesMetadata =
new AlignedTimeSeriesMetadata(timeColumn,
valueTimeSeriesMetadataList);
+ List<List<Modification>> pathModifications =
+ setModifications(resource, alignedTimeSeriesMetadata,
alignedPath, context);
+
alignedTimeSeriesMetadata.setChunkMetadataLoader(
new DiskAlignedChunkMetadataLoader(
- resource, alignedPath, context, filter, queryAllSensors));
+ resource, context, filter, queryAllSensors,
pathModifications));
}
}
}
return alignedTimeSeriesMetadata;
}
- private static void setModifications(
+ private static List<List<Modification>> setModifications(
TsFileResource resource,
AlignedTimeSeriesMetadata alignedTimeSeriesMetadata,
AlignedPath alignedPath,
QueryContext context) {
List<TimeseriesMetadata> valueTimeSeriesMetadataList =
alignedTimeSeriesMetadata.getValueTimeseriesMetadataList();
+ List<List<Modification>> res = new ArrayList<>();
boolean modified = false;
for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) {
if (valueTimeSeriesMetadataList.get(i) != null) {
@@ -334,10 +337,14 @@ public class FileLoaderUtils {
context.getPathModifications(
resource.getModFile(), alignedPath.getPathWithMeasurement(i));
valueTimeSeriesMetadataList.get(i).setModified(!pathModifications.isEmpty());
+ res.add(pathModifications);
modified = (modified || !pathModifications.isEmpty());
+ } else {
+ res.add(Collections.emptyList());
}
}
alignedTimeSeriesMetadata.getTimeseriesMetadata().setModified(modified);
+ return res;
}
/**