This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 39118785846 Optimize SeriesScanUtil by memorizing the order time and
satisfied information for each Seq and Unseq Resource (#12227)
39118785846 is described below
commit 39118785846342113704faa353bf01a7645bb763
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Mar 27 09:08:05 2024 +0800
Optimize SeriesScanUtil by memorizing the order time and satisfied
information for each Seq and Unseq Resource (#12227)
---
.../queryengine/execution/driver/DataDriver.java | 2 +
.../fragment/FragmentInstanceContext.java | 1 +
.../execution/operator/source/SeriesScanUtil.java | 88 +++++++--------
.../buffer/TimeSeriesMetadataCache.java | 6 +-
.../impl/ReadPointCompactionPerformer.java | 4 +-
.../dataregion/read/QueryDataSource.java | 120 ++++++++++++++++++---
.../iotdb/tsfile/file/metadata/PlainDeviceID.java | 2 +-
7 files changed, 155 insertions(+), 68 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
index ea658fcaf2b..ee4a0ad6d41 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
@@ -95,6 +95,8 @@ public class DataDriver extends Driver {
QueryDataSource queryDataSource =
new QueryDataSource(dataSource.getSeqResources(),
dataSource.getUnseqResources());
+ queryDataSource.setSingleDevice(dataSource.isSingleDevice());
+
queryDataSource.setDataTTL(dataSource.getDataTTL());
sourceOperator.initQueryDataSource(queryDataSource);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index b752c398818..e3025bd1497 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -372,6 +372,7 @@ public class FragmentInstanceContext extends QueryContext {
closedFilePaths = new HashSet<>();
unClosedFilePaths = new HashSet<>();
addUsedFilesForQuery(sharedQueryDataSource);
+ sharedQueryDataSource.setSingleDevice(selectedDeviceIdSet.size() == 1);
}
} finally {
setInitQueryDataSourceCost(System.nanoTime() - startTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index 7f4a98c8ddc..670e07be567 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.read.reader.common.PriorityM
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.IMetadata;
import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -68,6 +69,8 @@ public class SeriesScanUtil {
// The path of the target series which will be scanned.
protected final PartialPath seriesPath;
+
+ private final IDeviceID deviceID;
protected boolean isAligned = false;
private final TSDataType dataType;
@@ -113,6 +116,7 @@ public class SeriesScanUtil {
SeriesScanOptions scanOptions,
FragmentInstanceContext context) {
this.seriesPath = seriesPath;
+ this.deviceID = seriesPath.getIDeviceID();
this.dataType = seriesPath.getSeriesType();
this.scanOptions = scanOptions;
@@ -155,7 +159,7 @@ public class SeriesScanUtil {
* @param dataSource the query data source
*/
public void initQueryDataSource(QueryDataSource dataSource) {
- dataSource.fillOrderIndexes(seriesPath.getDevice(),
orderUtils.getAscending());
+ dataSource.fillOrderIndexes(deviceID, orderUtils.getAscending());
this.dataSource = dataSource;
// updated filter concerning TTL
@@ -1116,12 +1120,10 @@ public class SeriesScanUtil {
private void unpackAllOverlappedTsFilesToTimeSeriesMetadata(long
endpointTime)
throws IOException {
- while (orderUtils.hasNextUnseqResource()
- && orderUtils.isOverlapped(endpointTime,
orderUtils.getNextUnseqFileResource(false))) {
+ while (orderUtils.hasNextUnseqResource() &&
orderUtils.isCurUnSeqOverlappedWith(endpointTime)) {
unpackUnseqTsFileResource();
}
- while (orderUtils.hasNextSeqResource()
- && orderUtils.isOverlapped(endpointTime,
orderUtils.getNextSeqFileResource(false))) {
+ while (orderUtils.hasNextSeqResource() &&
orderUtils.isCurSeqOverlappedWith(endpointTime)) {
unpackSeqTsFileResource();
}
}
@@ -1256,15 +1258,15 @@ public class SeriesScanUtil {
long getOrderTime(Statistics<? extends Object> statistics);
- long getOrderTime(TsFileResource fileResource);
-
long getOverlapCheckTime(Statistics<? extends Object> range);
boolean isOverlapped(Statistics<? extends Object> left, Statistics<?
extends Object> right);
boolean isOverlapped(long time, Statistics<? extends Object> right);
- boolean isOverlapped(long time, TsFileResource right);
+ boolean isCurSeqOverlappedWith(long time);
+
+ boolean isCurUnSeqOverlappedWith(long time);
<T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor);
@@ -1300,12 +1302,6 @@ public class SeriesScanUtil {
return statistics.getEndTime();
}
- @SuppressWarnings("squid:S3740")
- @Override
- public long getOrderTime(TsFileResource fileResource) {
- return fileResource.getEndTime(seriesPath.getIDeviceID());
- }
-
@SuppressWarnings("squid:S3740")
@Override
public long getOverlapCheckTime(Statistics range) {
@@ -1325,8 +1321,13 @@ public class SeriesScanUtil {
}
@Override
- public boolean isOverlapped(long time, TsFileResource right) {
- return time <= right.getEndTime(seriesPath.getIDeviceID());
+ public boolean isCurSeqOverlappedWith(long time) {
+ return time <= dataSource.getCurrentSeqOrderTime(curSeqFileIndex);
+ }
+
+ @Override
+ public boolean isCurUnSeqOverlappedWith(long time) {
+ return time <= dataSource.getCurrentUnSeqOrderTime(curUnseqFileIndex);
}
@Override
@@ -1365,30 +1366,26 @@ public class SeriesScanUtil {
@Override
public boolean hasNextSeqResource() {
- while (dataSource.hasNextSeqResource(curSeqFileIndex, getAscending())) {
- TsFileResource tsFileResource =
dataSource.getSeqResourceByIndex(curSeqFileIndex);
- if (tsFileResource != null
- && tsFileResource.isSatisfied(
- seriesPath.getIDeviceID(), scanOptions.getGlobalTimeFilter(),
true, false)) {
+ while (dataSource.hasNextSeqResource(curSeqFileIndex, false, deviceID)) {
+ if (dataSource.isSeqSatisfied(
+ deviceID, curSeqFileIndex, scanOptions.getGlobalTimeFilter(),
false)) {
break;
}
curSeqFileIndex--;
}
- return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
+ return dataSource.hasNextSeqResource(curSeqFileIndex, false, deviceID);
}
@Override
public boolean hasNextUnseqResource() {
- while (dataSource.hasNextUnseqResource(curUnseqFileIndex)) {
- TsFileResource tsFileResource =
dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
- if (tsFileResource != null
- && tsFileResource.isSatisfied(
- seriesPath.getIDeviceID(), scanOptions.getGlobalTimeFilter(),
false, false)) {
+ while (dataSource.hasNextUnseqResource(curUnseqFileIndex, false,
deviceID)) {
+ if (dataSource.isUnSeqSatisfied(
+ deviceID, curUnseqFileIndex, scanOptions.getGlobalTimeFilter(),
false)) {
break;
}
curUnseqFileIndex++;
}
- return dataSource.hasNextUnseqResource(curUnseqFileIndex);
+ return dataSource.hasNextUnseqResource(curUnseqFileIndex, false,
deviceID);
}
@Override
@@ -1423,12 +1420,6 @@ public class SeriesScanUtil {
return statistics.getStartTime();
}
- @SuppressWarnings("squid:S3740")
- @Override
- public long getOrderTime(TsFileResource fileResource) {
- return fileResource.getStartTime(seriesPath.getIDeviceID());
- }
-
@SuppressWarnings("squid:S3740")
@Override
public long getOverlapCheckTime(Statistics range) {
@@ -1448,8 +1439,13 @@ public class SeriesScanUtil {
}
@Override
- public boolean isOverlapped(long time, TsFileResource right) {
- return time >= right.getStartTime(seriesPath.getIDeviceID());
+ public boolean isCurSeqOverlappedWith(long time) {
+ return time >= dataSource.getCurrentSeqOrderTime(curSeqFileIndex);
+ }
+
+ @Override
+ public boolean isCurUnSeqOverlappedWith(long time) {
+ return time >= dataSource.getCurrentUnSeqOrderTime(curUnseqFileIndex);
}
@Override
@@ -1488,30 +1484,26 @@ public class SeriesScanUtil {
@Override
public boolean hasNextSeqResource() {
- while (dataSource.hasNextSeqResource(curSeqFileIndex, getAscending())) {
- TsFileResource tsFileResource =
dataSource.getSeqResourceByIndex(curSeqFileIndex);
- if (tsFileResource != null
- && tsFileResource.isSatisfied(
- seriesPath.getIDeviceID(), scanOptions.getGlobalTimeFilter(),
true, false)) {
+ while (dataSource.hasNextSeqResource(curSeqFileIndex, true, deviceID)) {
+ if (dataSource.isSeqSatisfied(
+ deviceID, curSeqFileIndex, scanOptions.getGlobalTimeFilter(),
false)) {
break;
}
curSeqFileIndex++;
}
- return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
+ return dataSource.hasNextSeqResource(curSeqFileIndex, true, deviceID);
}
@Override
public boolean hasNextUnseqResource() {
- while (dataSource.hasNextUnseqResource(curUnseqFileIndex)) {
- TsFileResource tsFileResource =
dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
- if (tsFileResource != null
- && tsFileResource.isSatisfied(
- seriesPath.getIDeviceID(), scanOptions.getGlobalTimeFilter(),
false, false)) {
+ while (dataSource.hasNextUnseqResource(curUnseqFileIndex, true,
deviceID)) {
+ if (dataSource.isUnSeqSatisfied(
+ deviceID, curUnseqFileIndex, scanOptions.getGlobalTimeFilter(),
false)) {
break;
}
curUnseqFileIndex++;
}
- return dataSource.hasNextUnseqResource(curUnseqFileIndex);
+ return dataSource.hasNextUnseqResource(curUnseqFileIndex, true,
deviceID);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
index 97723754f7b..3557646ab82 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
@@ -261,7 +261,7 @@ public class TimeSeriesMetadataCache {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(TimeSeriesMetadataCacheKey.class)
- + 2 * RamUsageEstimator.shallowSizeOfInstance(String.class);
+ + RamUsageEstimator.shallowSizeOfInstance(String.class);
private final int regionId;
private final long timePartitionId;
@@ -296,9 +296,7 @@ public class TimeSeriesMetadataCache {
}
public long getRetainedSizeInBytes() {
- return INSTANCE_SIZE
- + sizeOfCharArray(((PlainDeviceID) device).toStringID().length())
- + sizeOfCharArray(measurement.length());
+ return INSTANCE_SIZE + device.ramBytesUsed() +
sizeOfCharArray(measurement.length());
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
index 14a00f81a9b..d927ac3ac82 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
@@ -117,7 +117,7 @@ public class ReadPointCompactionPerformer
Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice();
IDeviceID device = deviceInfo.left;
boolean isAligned = deviceInfo.right;
- queryDataSource.fillOrderIndexes(((PlainDeviceID)
device).toStringID(), true);
+ queryDataSource.fillOrderIndexes(device, true);
if (isAligned) {
compactAlignedSeries(
@@ -220,7 +220,7 @@ public class ReadPointCompactionPerformer
device,
measurementListArray[i],
fragmentInstanceContext,
- queryDataSource,
+ new QueryDataSource(queryDataSource),
compactionWriter,
schemaMap,
i)));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
index 41b45bd3641..660fbea5861 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
@@ -20,7 +20,8 @@
package org.apache.iotdb.db.storageengine.dataregion.read;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
+import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.util.ArrayList;
import java.util.Comparator;
@@ -41,8 +42,24 @@ public class QueryDataSource {
*/
private final List<TsFileResource> seqResources;
+ private int curSeqIndex = -1;
+
+ // asc: startTime; desc: endTime
+ private long curSeqOrderTime = 0;
+
+ private Boolean curSeqSatisfied = null;
+
private final List<TsFileResource> unseqResources;
+ private int curUnSeqIndex = -1;
+
+ // asc: startTime; desc: endTime
+ private long curUnSeqOrderTime = 0;
+
+ private Boolean curUnSeqSatisfied = null;
+
+ private boolean isSingleDevice;
+
/* The traversal order of unseqResources (different for each device) */
private int[] unSeqFileOrderIndex;
@@ -56,6 +73,14 @@ public class QueryDataSource {
this.unseqResources = unseqResources;
}
+ // used for compaction, because in compaction task(unlike query, each
QueryDataSource only serve
+ // for one series), we will reuse this object for multi series
+ public QueryDataSource(QueryDataSource other) {
+ this.seqResources = other.seqResources;
+ this.unseqResources = other.unseqResources;
+ this.unSeqFileOrderIndex = other.unSeqFileOrderIndex;
+ }
+
public List<TsFileResource> getSeqResources() {
return seqResources;
}
@@ -72,6 +97,40 @@ public class QueryDataSource {
this.dataTTL = dataTTL;
}
+ public boolean hasNextSeqResource(int curIndex, boolean ascending, IDeviceID
deviceID) {
+ boolean res = ascending ? curIndex < seqResources.size() : curIndex >= 0;
+ if (res && curIndex != this.curSeqIndex) {
+ this.curSeqIndex = curIndex;
+ this.curSeqOrderTime = seqResources.get(curIndex).getOrderTime(deviceID,
ascending);
+ this.curSeqSatisfied = null;
+ }
+ return res;
+ }
+
+ public boolean isSeqSatisfied(
+ IDeviceID deviceID, int curIndex, Filter timeFilter, boolean debug) {
+ if (curIndex != this.curSeqIndex) {
+ throw new IllegalArgumentException(
+ String.format("curIndex %d is not equal to curSeqIndex %d",
curIndex, this.curSeqIndex));
+ }
+ if (curSeqSatisfied == null) {
+ TsFileResource tsFileResource = seqResources.get(curSeqIndex);
+ curSeqSatisfied =
+ tsFileResource != null
+ && (isSingleDevice || tsFileResource.isSatisfied(deviceID,
timeFilter, true, debug));
+ }
+
+ return curSeqSatisfied;
+ }
+
+ public long getCurrentSeqOrderTime(int curIndex) {
+ if (curIndex != this.curSeqIndex) {
+ throw new IllegalArgumentException(
+ String.format("curIndex %d is not equal to curSeqIndex %d",
curIndex, this.curSeqIndex));
+ }
+ return this.curSeqOrderTime;
+ }
+
public TsFileResource getSeqResourceByIndex(int curIndex) {
if (curIndex < seqResources.size()) {
return seqResources.get(curIndex);
@@ -79,6 +138,43 @@ public class QueryDataSource {
return null;
}
+ public boolean hasNextUnseqResource(int curIndex, boolean ascending,
IDeviceID deviceID) {
+ boolean res = curIndex < unseqResources.size();
+ if (res && curIndex != this.curUnSeqIndex) {
+ this.curUnSeqIndex = curIndex;
+ this.curUnSeqOrderTime =
+
unseqResources.get(unSeqFileOrderIndex[curIndex]).getOrderTime(deviceID,
ascending);
+ this.curUnSeqSatisfied = null;
+ }
+ return res;
+ }
+
+ public boolean isUnSeqSatisfied(
+ IDeviceID deviceID, int curIndex, Filter timeFilter, boolean debug) {
+ if (curIndex != this.curUnSeqIndex) {
+ throw new IllegalArgumentException(
+ String.format(
+ "curIndex %d is not equal to curUnSeqIndex %d", curIndex,
this.curUnSeqIndex));
+ }
+ if (curUnSeqSatisfied == null) {
+ TsFileResource tsFileResource =
unseqResources.get(unSeqFileOrderIndex[curIndex]);
+ curUnSeqSatisfied =
+ tsFileResource != null
+ && (isSingleDevice || tsFileResource.isSatisfied(deviceID,
timeFilter, false, debug));
+ }
+
+ return curUnSeqSatisfied;
+ }
+
+ public long getCurrentUnSeqOrderTime(int curIndex) {
+ if (curIndex != this.curUnSeqIndex) {
+ throw new IllegalArgumentException(
+ String.format(
+ "curIndex %d is not equal to curSeqIndex %d", curIndex,
this.curUnSeqIndex));
+ }
+ return this.curUnSeqOrderTime;
+ }
+
public TsFileResource getUnseqResourceByIndex(int curIndex) {
int actualIndex = unSeqFileOrderIndex[curIndex];
if (actualIndex < unseqResources.size()) {
@@ -87,14 +183,6 @@ public class QueryDataSource {
return null;
}
- public boolean hasNextSeqResource(int curIndex, boolean ascending) {
- return ascending ? curIndex < seqResources.size() : curIndex >= 0;
- }
-
- public boolean hasNextUnseqResource(int curIndex) {
- return curIndex < unseqResources.size();
- }
-
public int getSeqResourcesSize() {
return seqResources.size();
}
@@ -103,15 +191,13 @@ public class QueryDataSource {
return unseqResources.size();
}
- public void fillOrderIndexes(String deviceId, boolean ascending) {
+ public void fillOrderIndexes(IDeviceID deviceId, boolean ascending) {
TreeMap<Long, List<Integer>> orderTimeToIndexMap =
ascending ? new TreeMap<>() : new TreeMap<>(descendingComparator);
int index = 0;
for (TsFileResource resource : unseqResources) {
orderTimeToIndexMap
- .computeIfAbsent(
- resource.getOrderTime(new PlainDeviceID(deviceId), ascending),
- key -> new ArrayList<>())
+ .computeIfAbsent(resource.getOrderTime(deviceId, ascending), key ->
new ArrayList<>())
.add(index++);
}
@@ -124,4 +210,12 @@ public class QueryDataSource {
}
this.unSeqFileOrderIndex = unSeqFileOrderIndexArray;
}
+
+ public boolean isSingleDevice() {
+ return isSingleDevice;
+ }
+
+ public void setSingleDevice(boolean singleDevice) {
+ isSingleDevice = singleDevice;
+ }
}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/PlainDeviceID.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/PlainDeviceID.java
index 1f0f862e5ba..1721dc1e1aa 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/PlainDeviceID.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/PlainDeviceID.java
@@ -35,7 +35,7 @@ public class PlainDeviceID implements IDeviceID {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(PlainDeviceID.class)
+ RamUsageEstimator.shallowSizeOfInstance(String.class);
- String deviceID;
+ private final String deviceID;
public PlainDeviceID(String deviceID) {
this.deviceID = deviceID;