This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch caLastOpt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bff77d369795ac440fcf8fb3d98d3c29776abb57 Author: JackieTien97 <[email protected]> AuthorDate: Tue May 16 19:43:23 2023 +0800 cp --- .../last/AbstractUpdateLastCacheOperator.java | 13 +++++++++++++ .../last/AlignedUpdateLastCacheOperator.java | 21 ++++++++++----------- .../process/last/UpdateLastCacheOperator.java | 13 ++++++------- .../db/mpp/plan/planner/OperatorTreeGenerator.java | 4 ++++ 4 files changed, 33 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java index f0b1ffc49a..5ac2e13c05 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AbstractUpdateLastCacheOperator.java @@ -19,18 +19,23 @@ package org.apache.iotdb.db.mpp.execution.operator.process.last; +import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache; import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; +import javax.annotation.Nullable; + public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator { protected static final TsBlock LAST_QUERY_EMPTY_TSBLOCK = new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT)) @@ -80,6 +85,14 @@ public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator return databaseName; } + protected void updateLastCache( + long time, @Nullable TsPrimitiveType value, MeasurementPath fullPath) { + if (needUpdateCache) { + TimeValuePair timeValuePair = new TimeValuePair(time, value); + lastCache.updateLastCache(getDatabaseName(), fullPath, timeValuePair, false, Long.MIN_VALUE); + } + } + @Override public boolean hasNext() throws Exception { return child.hasNextWithTimer(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java index a75581eb03..85a5900616 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; -import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; @@ -63,25 +62,25 @@ public class AlignedUpdateLastCacheOperator extends AbstractUpdateLastCacheOpera tsBlockBuilder.reset(); for (int i = 0; i + 1 < res.getValueColumnCount(); i += 2) { + MeasurementPath measurementPath = + new MeasurementPath( + devicePath.concatNode(seriesPath.getMeasurementList().get(i / 2)), + seriesPath.getSchemaList().get(i / 2), + true); if (!res.getColumn(i).isNull(0)) { long lastTime = res.getColumn(i).getLong(0); TsPrimitiveType lastValue = res.getColumn(i + 1).getTsPrimitiveType(0); - MeasurementPath measurementPath = - new MeasurementPath( - devicePath.concatNode(seriesPath.getMeasurementList().get(i / 2)), - seriesPath.getSchemaList().get(i / 2), - true); - if (needUpdateCache) { - TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue); - lastCache.updateLastCache( - getDatabaseName(), measurementPath, timeValuePair, false, Long.MIN_VALUE); - } + updateLastCache(lastTime, lastValue, measurementPath); LastQueryUtil.appendLastValue( tsBlockBuilder, lastTime, measurementPath.getFullPath(), lastValue.getStringValue(), seriesPath.getSchemaList().get(i / 2).getType().name()); + } else { + // we still need to update last cache if there is no data for this time series to avoid + // scanning all files each time + updateLastCache(Long.MIN_VALUE, null, measurementPath); } } return !tsBlockBuilder.isEmpty() ? tsBlockBuilder.build() : LAST_QUERY_EMPTY_TSBLOCK; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java index 7bcc5a61e5..e45c18c019 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java @@ -23,7 +23,6 @@ import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; @@ -34,10 +33,10 @@ public class UpdateLastCacheOperator extends AbstractUpdateLastCacheOperator { // fullPath for queried time series // It should be exact PartialPath, neither MeasurementPath nor AlignedPath, because lastCache only // accept PartialPath - private MeasurementPath fullPath; + private final MeasurementPath fullPath; // dataType for queried time series; - private String dataType; + private final String dataType; public UpdateLastCacheOperator( OperatorContext operatorContext, @@ -65,16 +64,16 @@ public class UpdateLastCacheOperator extends AbstractUpdateLastCacheOperator { // last value is null if (res.getColumn(0).isNull(0)) { + // we still need to update last cache if there is no data for this time series to avoid + // scanning all files each time + updateLastCache(Long.MIN_VALUE, null, fullPath); return LAST_QUERY_EMPTY_TSBLOCK; } long lastTime = res.getColumn(0).getLong(0); TsPrimitiveType lastValue = res.getColumn(1).getTsPrimitiveType(0); - if (needUpdateCache) { - TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue); - lastCache.updateLastCache(getDatabaseName(), fullPath, timeValuePair, false, Long.MIN_VALUE); - } + updateLastCache(lastTime, lastValue, fullPath); tsBlockBuilder.reset(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index 4e945c8be6..e73bdd4236 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -2031,6 +2031,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath); if (timeValuePair == null) { // last value is not cached return createUpdateLastCacheOperator(node, context, node.getSeriesPath()); + } else if (timeValuePair.getValue() == null) { // there is no data for this time series + return null; } else if (!LastQueryUtil.satisfyFilter( updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()), timeValuePair)) { // cached last value is not satisfied @@ -2125,6 +2127,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(measurementPath); if (timeValuePair == null) { // last value is not cached unCachedMeasurementIndexes.add(i); + } else if (timeValuePair.getValue() == null) { + // there is no data for this time series, just ignore } else if (!LastQueryUtil.satisfyFilter( updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()), timeValuePair)) { // cached last value is not satisfied
