This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 9603556d833 [To dev/1.3] Merge last query scan node of same device
9603556d833 is described below
commit 9603556d833c40d9fdd0a4bdff682cbb5272729d
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jul 11 11:01:22 2025 +0800
[To dev/1.3] Merge last query scan node of same device
---
.../db/queryengine/common/MPPQueryContext.java | 29 +++
.../execution/MemoryEstimationHelper.java | 28 +++
.../execution/fragment/DataNodeQueryContext.java | 21 +-
.../last/AbstractUpdateLastCacheOperator.java | 25 +-
.../last/AlignedUpdateLastCacheOperator.java | 11 +-
.../AlignedUpdateViewPathLastCacheOperator.java | 6 +-
.../operator/process/last/LastQueryOperator.java | 1 +
.../process/last/LastQuerySortOperator.java | 10 +-
.../process/last/UpdateLastCacheOperator.java | 28 ++-
.../db/queryengine/plan/analyze/Analysis.java | 33 ++-
.../queryengine/plan/analyze/AnalyzeVisitor.java | 83 +++++--
.../queryengine/plan/analyze/ExpressionUtils.java | 14 +-
.../memory/StatementMemorySourceVisitor.java | 3 +-
.../plan/planner/LogicalPlanBuilder.java | 152 +++++--------
.../plan/planner/OperatorTreeGenerator.java | 251 ++++++++++-----------
.../plan/planner/SubPlanTypeExtractor.java | 6 -
.../planner/distribution/ExchangeNodeAdder.java | 15 +-
.../planner/distribution/NodeGroupContext.java | 12 +
.../SimpleFragmentParallelPlanner.java | 12 +-
.../plan/planner/distribution/SourceRewriter.java | 152 ++++++++++---
.../plan/planner/plan/node/PlanGraphPrinter.java | 18 +-
.../plan/planner/plan/node/PlanNodeType.java | 14 +-
.../plan/planner/plan/node/PlanVisitor.java | 5 -
.../plan/node/process/MultiChildProcessNode.java | 4 +
.../plan/node/process/last/LastQueryNode.java | 141 ++++++++++--
.../plan/node/source/AlignedLastQueryScanNode.java | 245 --------------------
.../plan/node/source/LastQueryScanNode.java | 193 +++++++++++++---
.../execution/operator/LastQueryOperatorTest.java | 6 +-
.../operator/LastQuerySortOperatorTest.java | 6 +-
.../plan/planner/distribution/LastQueryTest.java | 25 +-
.../logical/DataQueryLogicalPlannerTest.java | 86 +++----
.../node/source/LastQueryScanNodeSerdeTest.java | 70 ++++++
.../iotdb/commons/partition/DataPartition.java | 15 +-
.../apache/iotdb/commons/partition/Partition.java | 2 +-
.../org/apache/iotdb/commons/path/PartialPath.java | 13 ++
35 files changed, 1012 insertions(+), 723 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 1621327f9f7..63970762de7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -81,6 +81,13 @@ public class MPPQueryContext {
// constructing some Expression and PlanNode.
private final MemoryReservationManager memoryReservationManager;
+ private static final int minSizeToUseSampledTimeseriesOperandMemCost = 100;
+ private double avgTimeseriesOperandMemCost = 0;
+ private int numsOfSampledTimeseriesOperand = 0;
+ // When there is no view in a last query and no device exists in multiple
regions,
+ // the updateScanNum process in distributed planning can be skipped.
+ private boolean needUpdateScanNumForLastQuery = false;
+
private boolean userQuery = false;
public MPPQueryContext(QueryId queryId) {
@@ -331,8 +338,30 @@ public class MPPQueryContext {
this.memoryReservationManager.releaseMemoryCumulatively(bytes);
}
+ public boolean useSampledAvgTimeseriesOperandMemCost() {
+ return numsOfSampledTimeseriesOperand >=
minSizeToUseSampledTimeseriesOperandMemCost;
+ }
+
+ public long getAvgTimeseriesOperandMemCost() {
+ return (long) avgTimeseriesOperandMemCost;
+ }
+
+ public void calculateAvgTimeseriesOperandMemCost(long current) {
+ numsOfSampledTimeseriesOperand++;
+ avgTimeseriesOperandMemCost +=
+ (current - avgTimeseriesOperandMemCost) /
numsOfSampledTimeseriesOperand;
+ }
+
// endregion
+ public boolean needUpdateScanNumForLastQuery() {
+ return needUpdateScanNumForLastQuery;
+ }
+
+ public void setNeedUpdateScanNumForLastQuery(boolean
needUpdateScanNumForLastQuery) {
+ this.needUpdateScanNumForLastQuery = needUpdateScanNumForLastQuery;
+ }
+
public boolean isUserQuery() {
return userQuery;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
index a18e2dbc58b..9da6b85e9f6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java
@@ -28,7 +28,9 @@ import org.apache.tsfile.utils.RamUsageEstimator;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
public class MemoryEstimationHelper {
@@ -41,6 +43,11 @@ public class MemoryEstimationHelper {
private static final long MEASUREMENT_PATH_INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(AlignedPath.class);
+ private static final long ARRAY_LIST_INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(ArrayList.class);
+ private static final long INTEGER_INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(Integer.class);
+
private MemoryEstimationHelper() {
// hide the constructor
}
@@ -86,4 +93,25 @@ public class MemoryEstimationHelper {
}
return totalSize;
}
+
+ // This method should only be called if the content in the current
PartialPath comes from other
+ // structures whose memory cost have already been calculated.
+ public static long getEstimatedSizeOfCopiedPartialPath(@Nullable final
PartialPath partialPath) {
+ if (partialPath == null) {
+ return 0;
+ }
+ return PARTIAL_PATH_INSTANCE_SIZE +
RamUsageEstimator.shallowSizeOf(partialPath.getNodes());
+ }
+
+ public static long getEstimatedSizeOfIntegerArrayList(List<Integer>
integerArrayList) {
+ if (integerArrayList == null) {
+ return 0L;
+ }
+ long size = ARRAY_LIST_INSTANCE_SIZE;
+ size +=
+ (long) RamUsageEstimator.NUM_BYTES_ARRAY_HEADER
+ + (long) integerArrayList.size() * (long)
RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+ size += INTEGER_INSTANCE_SIZE * integerArrayList.size();
+ return RamUsageEstimator.alignObjectSize(size);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DataNodeQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DataNodeQueryContext.java
index 881ff9dc8a2..ffa3ead32e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DataNodeQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DataNodeQueryContext.java
@@ -26,8 +26,8 @@ import org.apache.tsfile.utils.Pair;
import javax.annotation.concurrent.GuardedBy;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@@ -43,7 +43,7 @@ public class DataNodeQueryContext {
private final ReentrantLock lock = new ReentrantLock();
public DataNodeQueryContext(int dataNodeFINum) {
- this.uncachedPathToSeriesScanInfo = new HashMap<>();
+ this.uncachedPathToSeriesScanInfo = new ConcurrentHashMap<>();
this.dataNodeFINum = new AtomicInteger(dataNodeFINum);
}
@@ -59,15 +59,24 @@ public class DataNodeQueryContext {
return uncachedPathToSeriesScanInfo.get(path);
}
+ public Map<PartialPath, Pair<AtomicInteger, TimeValuePair>>
getUncachedPathToSeriesScanInfo() {
+ return uncachedPathToSeriesScanInfo;
+ }
+
public int decreaseDataNodeFINum() {
return dataNodeFINum.decrementAndGet();
}
- public void lock() {
- lock.lock();
+ public void lock(boolean isDeviceInMultiRegion) {
+ // When a device exists in only one region, there will be no intermediate
state.
+ if (isDeviceInMultiRegion) {
+ lock.lock();
+ }
}
- public void unLock() {
- lock.unlock();
+ public void unLock(boolean isDeviceInMultiRegion) {
+ if (isDeviceInMultiRegion) {
+ lock.unlock();
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
index b4ce8943ea9..30d305bcec7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java
@@ -61,12 +61,15 @@ public abstract class AbstractUpdateLastCacheOperator
implements ProcessOperator
protected String databaseName;
+ protected boolean deviceInMultiRegion;
+
protected AbstractUpdateLastCacheOperator(
- OperatorContext operatorContext,
- Operator child,
- DataNodeSchemaCache dataNodeSchemaCache,
- boolean needUpdateCache,
- boolean needUpdateNullEntry) {
+ final OperatorContext operatorContext,
+ final Operator child,
+ final DataNodeSchemaCache dataNodeSchemaCache,
+ final boolean needUpdateCache,
+ final boolean needUpdateNullEntry,
+ final boolean deviceInMultiRegion) {
this.operatorContext = operatorContext;
this.child = child;
this.lastCache = dataNodeSchemaCache;
@@ -75,6 +78,7 @@ public abstract class AbstractUpdateLastCacheOperator
implements ProcessOperator
this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1);
this.dataNodeQueryContext =
operatorContext.getDriverContext().getFragmentInstanceContext().getDataNodeQueryContext();
+ this.deviceInMultiRegion = deviceInMultiRegion;
}
@Override
@@ -103,8 +107,8 @@ public abstract class AbstractUpdateLastCacheOperator
implements ProcessOperator
return;
}
try {
- dataNodeQueryContext.lock();
- Pair<AtomicInteger, TimeValuePair> seriesScanInfo =
+ dataNodeQueryContext.lock(deviceInMultiRegion);
+ final Pair<AtomicInteger, TimeValuePair> seriesScanInfo =
dataNodeQueryContext.getSeriesScanInfo(fullPath);
// may enter this case when use TTL
@@ -112,6 +116,11 @@ public abstract class AbstractUpdateLastCacheOperator
implements ProcessOperator
return;
}
+ if (!deviceInMultiRegion) {
+ lastCache.updateLastCache(
+ getDatabaseName(), fullPath, new TimeValuePair(time, value),
false, Long.MIN_VALUE);
+ return;
+ }
// update cache in DataNodeQueryContext
if (seriesScanInfo.right == null || time >
seriesScanInfo.right.getTimestamp()) {
seriesScanInfo.right = new TimeValuePair(time, value);
@@ -122,7 +131,7 @@ public abstract class AbstractUpdateLastCacheOperator
implements ProcessOperator
getDatabaseName(), fullPath, seriesScanInfo.right, false,
Long.MIN_VALUE);
}
} finally {
- dataNodeQueryContext.unLock();
+ dataNodeQueryContext.unLock(deviceInMultiRegion);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
index c3f5fff6f78..85c3134c429 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
@@ -47,8 +47,15 @@ public class AlignedUpdateLastCacheOperator extends
AbstractUpdateLastCacheOpera
AlignedPath seriesPath,
DataNodeSchemaCache dataNodeSchemaCache,
boolean needUpdateCache,
- boolean needUpdateNullEntry) {
- super(operatorContext, child, dataNodeSchemaCache, needUpdateCache,
needUpdateNullEntry);
+ boolean needUpdateNullEntry,
+ boolean deviceInMultiRegion) {
+ super(
+ operatorContext,
+ child,
+ dataNodeSchemaCache,
+ needUpdateCache,
+ needUpdateNullEntry,
+ deviceInMultiRegion);
this.seriesPath = seriesPath;
this.devicePath = seriesPath.getDevicePath();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateViewPathLastCacheOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateViewPathLastCacheOperator.java
index 9a8a309b2ec..3fc9f0412bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateViewPathLastCacheOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateViewPathLastCacheOperator.java
@@ -41,14 +41,16 @@ public class AlignedUpdateViewPathLastCacheOperator extends
AlignedUpdateLastCac
DataNodeSchemaCache dataNodeSchemaCache,
boolean needUpdateCache,
boolean needUpdateNullEntry,
- String outputViewPath) {
+ String outputViewPath,
+ boolean deviceInMultiRegion) {
super(
operatorContext,
child,
seriesPath,
dataNodeSchemaCache,
needUpdateCache,
- needUpdateNullEntry);
+ needUpdateNullEntry,
+ deviceInMultiRegion);
checkArgument(seriesPath.getMeasurementList().size() == 1);
this.outputViewPath = outputViewPath;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
index 9f50dcb6e7d..dd41bbd7afa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java
@@ -116,6 +116,7 @@ public class LastQueryOperator implements ProcessOperator {
return null;
} else if (!tsBlock.isEmpty()) {
LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock);
+ return null;
}
} else {
children.get(currentIndex).close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
index 0d102073802..e40cb1ad131 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java
@@ -153,7 +153,8 @@ public class LastQuerySortOperator implements
ProcessOperator {
while (keepGoing(start, maxRuntime, endIndex)) {
- if (prepareData()) {
+ prepareData();
+ if (previousTsBlock == null) {
return null;
}
@@ -179,21 +180,18 @@ public class LastQuerySortOperator implements
ProcessOperator {
&& !tsBlockBuilder.isFull();
}
- private boolean prepareData() throws Exception {
+ private void prepareData() throws Exception {
if (previousTsBlock == null || previousTsBlock.getPositionCount() <=
previousTsBlockIndex) {
if (children.get(currentIndex).hasNextWithTimer()) {
previousTsBlock = children.get(currentIndex).nextWithTimer();
previousTsBlockIndex = 0;
- if (previousTsBlock == null) {
- return true;
- }
+ return;
} else {
children.get(currentIndex).close();
children.set(currentIndex, null);
}
currentIndex++;
}
- return false;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateLastCacheOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateLastCacheOperator.java
index d49e89c19ef..d55f1d9fd4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateLastCacheOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -53,7 +53,33 @@ public class UpdateLastCacheOperator extends
AbstractUpdateLastCacheOperator {
DataNodeSchemaCache dataNodeSchemaCache,
boolean needUpdateCache,
boolean isNeedUpdateNullEntry) {
- super(operatorContext, child, dataNodeSchemaCache, needUpdateCache,
isNeedUpdateNullEntry);
+ this(
+ operatorContext,
+ child,
+ fullPath,
+ dataType,
+ dataNodeSchemaCache,
+ needUpdateCache,
+ isNeedUpdateNullEntry,
+ true);
+ }
+
+ public UpdateLastCacheOperator(
+ OperatorContext operatorContext,
+ Operator child,
+ MeasurementPath fullPath,
+ TSDataType dataType,
+ DataNodeSchemaCache dataNodeSchemaCache,
+ boolean needUpdateCache,
+ boolean isNeedUpdateNullEntry,
+ boolean deviceInMultiRegion) {
+ super(
+ operatorContext,
+ child,
+ dataNodeSchemaCache,
+ needUpdateCache,
+ isNeedUpdateNullEntry,
+ deviceInMultiRegion);
this.fullPath = fullPath;
this.dataType = dataType.name();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index 93e386b9bd5..c3f31fa8a3b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -111,6 +111,10 @@ public class Analysis implements IAnalysis {
// map from device name to series/aggregation under this device
private Set<Expression> sourceExpressions;
+ // In order to perform some optimization, when the source expression is
+ // not used later, nothing will be placed in this structure.
+ private boolean shouldHaveSourceExpression;
+
// input expressions of aggregations to be calculated
private Set<Expression> sourceTransformExpressions = new HashSet<>();
@@ -231,7 +235,9 @@ public class Analysis implements IAnalysis {
// Key: non-writable view expression, Value: corresponding source expressions
private Map<Expression, List<Expression>>
lastQueryNonWritableViewSourceExpressionMap;
- private Set<Expression> lastQueryBaseExpressions;
+ private Map<String, Map<String, Expression>>
lastQueryOutputPathToSourceExpressionMap;
+
+ private Set<String> deviceExistViewSet;
// header of result dataset
private DatasetHeader respDatasetHeader;
@@ -610,6 +616,14 @@ public class Analysis implements IAnalysis {
this.sourceExpressions = sourceExpressions;
}
+ public void setShouldHaveSourceExpression(boolean
shouldHaveSourceExpression) {
+ this.shouldHaveSourceExpression = shouldHaveSourceExpression;
+ }
+
+ public boolean shouldHaveSourceExpression() {
+ return shouldHaveSourceExpression;
+ }
+
public Set<Expression> getSourceTransformExpressions() {
return sourceTransformExpressions;
}
@@ -875,12 +889,21 @@ public class Analysis implements IAnalysis {
this.timeseriesOrderingForLastQuery = timeseriesOrderingForLastQuery;
}
- public Set<Expression> getLastQueryBaseExpressions() {
- return this.lastQueryBaseExpressions;
+ public Map<String, Map<String, Expression>>
getLastQueryOutputPathToSourceExpressionMap() {
+ return lastQueryOutputPathToSourceExpressionMap;
+ }
+
+ public void setLastQueryOutputPathToSourceExpressionMap(
+ Map<String, Map<String, Expression>>
lastQueryOutputPathToSourceExpressionMap) {
+ this.lastQueryOutputPathToSourceExpressionMap =
lastQueryOutputPathToSourceExpressionMap;
+ }
+
+ public Set<String> getDeviceExistViewSet() {
+ return deviceExistViewSet;
}
- public void setLastQueryBaseExpressions(Set<Expression>
lastQueryBaseExpressions) {
- this.lastQueryBaseExpressions = lastQueryBaseExpressions;
+ public void setDeviceExistViewSet(Set<String> deviceExistViewSet) {
+ this.deviceExistViewSet = deviceExistViewSet;
}
public Map<Expression, List<Expression>>
getLastQueryNonWritableViewSourceExpressionMap() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index b4f163838d9..e477a46a1f8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -186,6 +186,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.TreeMap;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@@ -601,48 +602,93 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
for (ResultColumn resultColumn :
queryStatement.getSelectComponent().getResultColumns()) {
selectExpressions.add(resultColumn.getExpression());
}
- analyzeLastSource(analysis, selectExpressions, schemaTree, context);
-
analysis.setRespDatasetHeader(DatasetHeaderFactory.getLastQueryHeader());
- // fetch partition information
- analyzeDataPartition(analysis, queryStatement, schemaTree, context);
-
- return analysis;
+ return analyzeLastSourceAndDataPartition(analysis, selectExpressions,
schemaTree, context);
}
- private void analyzeLastSource(
+ private Analysis analyzeLastSourceAndDataPartition(
Analysis analysis,
List<Expression> selectExpressions,
ISchemaTree schemaTree,
MPPQueryContext context) {
- Set<Expression> sourceExpressions = new LinkedHashSet<>();
- Set<Expression> lastQueryBaseExpressions = new LinkedHashSet<>();
+
+ // For fetch data partition
+ Set<String> allDeviceSet = new HashSet<>();
+
+ // For LogicalPlan
+ Set<String> deviceExistViewSet = new HashSet<>();
+ Map<String, Map<String, Expression>> outputPathToSourceExpressionMap = new
LinkedHashMap<>();
Map<Expression, List<Expression>>
lastQueryNonWritableViewSourceExpressionMap = null;
+ Ordering timeseriesOrdering = analysis.getTimeseriesOrderingForLastQuery();
+
+ boolean hasAliasView = false;
for (Expression selectExpression : selectExpressions) {
for (Expression lastQuerySourceExpression :
bindSchemaForExpression(selectExpression, schemaTree, context)) {
if (lastQuerySourceExpression instanceof TimeSeriesOperand) {
- lastQueryBaseExpressions.add(lastQuerySourceExpression);
- sourceExpressions.add(lastQuerySourceExpression);
- } else {
- if (lastQueryNonWritableViewSourceExpressionMap == null) {
- lastQueryNonWritableViewSourceExpressionMap = new HashMap<>();
+ TimeSeriesOperand timeSeriesOperand = (TimeSeriesOperand)
lastQuerySourceExpression;
+ MeasurementPath outputPath =
+ (MeasurementPath)
+ (timeSeriesOperand.isViewExpression()
+ ? timeSeriesOperand.getViewPath()
+ : timeSeriesOperand.getPath());
+ String actualDeviceID =
+
ExpressionAnalyzer.getDeviceNameInSourceExpression(timeSeriesOperand);
+ String outputDeviceID =
+ timeSeriesOperand.isViewExpression() ? outputPath.getDevice() :
actualDeviceID;
+ if (timeSeriesOperand.isViewExpression()) {
+ deviceExistViewSet.add(outputDeviceID);
+ if (!hasAliasView) {
+ allDeviceSet.addAll(outputPathToSourceExpressionMap.keySet());
+ hasAliasView = true;
+ }
+ allDeviceSet.add(actualDeviceID);
+ } else if (hasAliasView) {
+ allDeviceSet.add(actualDeviceID);
}
+ // If we use actual deviceId, it may overwrite other expression of
same measurement in
+ // Map<String, Expression>.
+ outputPathToSourceExpressionMap
+ .computeIfAbsent(
+ outputDeviceID,
+ k ->
+ timeseriesOrdering != null
+ ? new
TreeMap<>(timeseriesOrdering.getStringComparator())
+ : new LinkedHashMap<>())
+ .put(outputPath.getMeasurement(), timeSeriesOperand);
+ } else {
+ lastQueryNonWritableViewSourceExpressionMap =
+ lastQueryNonWritableViewSourceExpressionMap == null
+ ? new HashMap<>()
+ : lastQueryNonWritableViewSourceExpressionMap;
List<Expression> sourceExpressionsOfNonWritableView =
searchSourceExpressions(lastQuerySourceExpression);
lastQueryNonWritableViewSourceExpressionMap.putIfAbsent(
lastQuerySourceExpression, sourceExpressionsOfNonWritableView);
- sourceExpressions.addAll(sourceExpressionsOfNonWritableView);
+ for (Expression expression : sourceExpressionsOfNonWritableView) {
+
allDeviceSet.add(ExpressionAnalyzer.getDeviceNameInSourceExpression(expression));
+ }
}
}
}
+ if (allDeviceSet.isEmpty()) {
+ allDeviceSet = outputPathToSourceExpressionMap.keySet();
+ } else if (!hasAliasView) {
+ allDeviceSet.addAll(outputPathToSourceExpressionMap.keySet());
+ }
- analysis.setSourceExpressions(sourceExpressions);
- analysis.setLastQueryBaseExpressions(lastQueryBaseExpressions);
+ analysis.setShouldHaveSourceExpression(!allDeviceSet.isEmpty());
+
analysis.setLastQueryOutputPathToSourceExpressionMap(outputPathToSourceExpressionMap);
+ analysis.setDeviceExistViewSet(
+ deviceExistViewSet.isEmpty() ? Collections.emptySet() :
deviceExistViewSet);
analysis.setLastQueryNonWritableViewSourceExpressionMap(
lastQueryNonWritableViewSourceExpressionMap);
+
+ DataPartition dataPartition = fetchDataPartitionByDevices(allDeviceSet,
schemaTree, context);
+ analysis.setDataPartitionInfo(dataPartition);
+ return analysis;
}
private void updateSchemaTreeByViews(
@@ -3185,13 +3231,12 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
updateSchemaTreeByViews(analysis, schemaTree, context);
logger.debug("[EndFetchSchema]]");
- analyzeLastSource(
+ analyzeLastSourceAndDataPartition(
analysis,
Collections.singletonList(
new TimeSeriesOperand(showTimeSeriesStatement.getPathPattern())),
schemaTree,
context);
- analyzeDataPartition(analysis, new QueryStatement(), schemaTree,
context);
}
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowTimeSeriesHeader());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java
index fc9d3a0aa51..a27e323e1e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java
@@ -69,9 +69,17 @@ public class ExpressionUtils {
final MPPQueryContext queryContext) {
List<Expression> resultExpressions = new ArrayList<>();
for (PartialPath actualPath : actualPaths) {
- resultExpressions.add(
- reserveMemoryForExpression(
- queryContext, reconstructTimeSeriesOperand(rawExpression,
actualPath)));
+ Expression expression = reconstructTimeSeriesOperand(rawExpression,
actualPath);
+ long memCost;
+ if (queryContext.useSampledAvgTimeseriesOperandMemCost()) {
+ memCost = queryContext.getAvgTimeseriesOperandMemCost();
+ } else {
+ memCost = expression.ramBytesUsed();
+ queryContext.calculateAvgTimeseriesOperandMemCost(memCost);
+ }
+ queryContext.reserveMemoryForFrontEnd(memCost);
+
+ resultExpressions.add(expression);
}
return resultExpressions;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/StatementMemorySourceVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/StatementMemorySourceVisitor.java
index 7a37902dbb9..cb456d3a30e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/StatementMemorySourceVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/StatementMemorySourceVisitor.java
@@ -69,7 +69,8 @@ public class StatementMemorySourceVisitor
}
private boolean sourceNotExist(StatementMemorySourceContext context) {
- return (context.getAnalysis().getSourceExpressions() == null
+ return !context.getAnalysis().shouldHaveSourceExpression()
+ && (context.getAnalysis().getSourceExpressions() == null
|| context.getAnalysis().getSourceExpressions().isEmpty())
&& (context.getAnalysis().getDeviceToSourceExpressions() == null
|| context.getAnalysis().getDeviceToSourceExpressions().isEmpty())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 4c8eb7f1e39..1d206b03ca1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -81,10 +81,8 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode;
@@ -109,12 +107,12 @@ import
org.apache.iotdb.db.utils.columngenerator.parameter.SlidingTimeColumnGene
import org.apache.commons.lang3.Validate;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -122,7 +120,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
@@ -233,41 +230,24 @@ public class LogicalPlanBuilder {
}
public LogicalPlanBuilder planLast(Analysis analysis, Ordering
timeseriesOrdering) {
- Set<String> deviceAlignedSet = new HashSet<>();
- Set<String> deviceExistViewSet = new HashSet<>();
// <Device, <Measurement, Expression>>
- Map<String, Map<String, Expression>> outputPathToSourceExpressionMap = new
LinkedHashMap<>();
-
- for (Expression sourceExpression : analysis.getLastQueryBaseExpressions())
{
- MeasurementPath outputPath =
- (MeasurementPath)
- (sourceExpression.isViewExpression()
- ? sourceExpression.getViewPath()
- : ((TimeSeriesOperand) sourceExpression).getPath());
- String outputDevice = outputPath.getDevice();
- outputPathToSourceExpressionMap
- .computeIfAbsent(
- outputDevice,
- k ->
- timeseriesOrdering != null
- ? new TreeMap<>(timeseriesOrdering.getStringComparator())
- : new LinkedHashMap<>())
- .put(outputPath.getMeasurement(), sourceExpression);
- if (outputPath.isUnderAlignedEntity()) {
- deviceAlignedSet.add(outputDevice);
- }
- if (sourceExpression.isViewExpression()) {
- deviceExistViewSet.add(outputDevice);
- }
- }
+ Map<String, Map<String, Expression>> outputPathToSourceExpressionMap =
+ analysis.getLastQueryOutputPathToSourceExpressionMap();
+ Set<String> deviceExistViewSet = analysis.getDeviceExistViewSet();
- List<PlanNode> sourceNodeList = new ArrayList<>();
+ LastQueryNode lastQueryNode =
+ new LastQueryNode(
+ context.getQueryId().genPlanNodeId(),
+ timeseriesOrdering,
+ analysis.getLastQueryNonWritableViewSourceExpressionMap() != null);
for (Map.Entry<String, Map<String, Expression>>
deviceMeasurementExpressionEntry :
outputPathToSourceExpressionMap.entrySet()) {
- String outputDevice = deviceMeasurementExpressionEntry.getKey();
+ String deviceId = deviceMeasurementExpressionEntry.getKey();
Map<String, Expression> measurementToExpressionsOfDevice =
deviceMeasurementExpressionEntry.getValue();
- if (deviceExistViewSet.contains(outputDevice)) {
+
+ boolean deviceExistView = deviceExistViewSet.contains(deviceId);
+ if (deviceExistView) {
// exist view
for (Expression sourceExpression :
measurementToExpressionsOfDevice.values()) {
MeasurementPath selectedPath =
@@ -277,90 +257,66 @@ public class LogicalPlanBuilder {
? sourceExpression.getViewPath().getFullPath()
: null;
- if (selectedPath.isUnderAlignedEntity()) { // aligned series
- sourceNodeList.add(
- reserveMemoryForSeriesSourceNode(
- new AlignedLastQueryScanNode(
- context.getQueryId().genPlanNodeId(),
- new AlignedPath(selectedPath),
- outputViewPath)));
- } else { // non-aligned series
- sourceNodeList.add(
- reserveMemoryForSeriesSourceNode(
- new LastQueryScanNode(
- context.getQueryId().genPlanNodeId(), selectedPath,
outputViewPath)));
- }
+ PartialPath devicePath = selectedPath.getDevicePath();
+ // For expression with view path, we do not use the deviceId in
Map.Entry because it is a
+ // view device.
+ devicePath.setIDeviceID(selectedPath.getDevice());
+ long memCost =
+ lastQueryNode.addDeviceLastQueryScanNode(
+ context.getQueryId().genPlanNodeId(),
+ devicePath,
+ selectedPath.isUnderAlignedEntity(),
+
Collections.singletonList(selectedPath.getMeasurementSchema()),
+ outputViewPath);
+ this.context.reserveMemoryForFrontEnd(memCost);
}
} else {
- if (deviceAlignedSet.contains(outputDevice)) {
- // aligned series
- List<MeasurementPath> measurementPaths =
- measurementToExpressionsOfDevice.values().stream()
- .map(expression -> (MeasurementPath) ((TimeSeriesOperand)
expression).getPath())
- .collect(Collectors.toList());
- AlignedPath alignedPath = new
AlignedPath(measurementPaths.get(0).getDevicePath());
- for (MeasurementPath measurementPath : measurementPaths) {
- alignedPath.addMeasurement(measurementPath);
- }
- sourceNodeList.add(
- reserveMemoryForSeriesSourceNode(
- new AlignedLastQueryScanNode(
- context.getQueryId().genPlanNodeId(), alignedPath,
null)));
- } else {
- // non-aligned series
- for (Expression sourceExpression :
measurementToExpressionsOfDevice.values()) {
- MeasurementPath selectedPath =
- (MeasurementPath) ((TimeSeriesOperand)
sourceExpression).getPath();
- sourceNodeList.add(
- reserveMemoryForSeriesSourceNode(
- new LastQueryScanNode(
- context.getQueryId().genPlanNodeId(), selectedPath,
null)));
- }
+ boolean aligned = false;
+ List<IMeasurementSchema> measurementSchemas =
+ new ArrayList<>(measurementToExpressionsOfDevice.size());
+ PartialPath devicePath = null;
+ for (Expression sourceExpression :
measurementToExpressionsOfDevice.values()) {
+ MeasurementPath selectedPath =
+ (MeasurementPath) ((TimeSeriesOperand)
sourceExpression).getPath();
+ aligned = selectedPath.isUnderAlignedEntity();
+ devicePath = devicePath == null ? selectedPath.getDevicePath() :
devicePath;
+ measurementSchemas.add(selectedPath.getMeasurementSchema());
}
+ // DeviceId is needed in the distribution plan stage
+ devicePath.setIDeviceID(deviceId);
+ long memCost =
+ lastQueryNode.addDeviceLastQueryScanNode(
+ context.getQueryId().genPlanNodeId(),
+ devicePath,
+ aligned,
+ measurementSchemas,
+ null);
+ this.context.reserveMemoryForFrontEnd(memCost);
}
}
+
this.context.reserveMemoryForFrontEnd(lastQueryNode.getMemorySizeOfSharedStructures());
- processLastQueryTransformNode(analysis, sourceNodeList);
-
- if (timeseriesOrdering != null) {
- sourceNodeList.sort(
- Comparator.comparing(
- child -> {
- String sortKey = "";
- if (child instanceof LastQueryScanNode) {
- sortKey = ((LastQueryScanNode)
child).getOutputSymbolForSort();
- } else if (child instanceof AlignedLastQueryScanNode) {
- sortKey = ((AlignedLastQueryScanNode)
child).getOutputSymbolForSort();
- } else if (child instanceof LastQueryTransformNode) {
- sortKey = ((LastQueryTransformNode)
child).getOutputSymbolForSort();
- }
- return sortKey;
- }));
- if (timeseriesOrdering.equals(Ordering.DESC)) {
- Collections.reverse(sourceNodeList);
- }
- }
+ processLastQueryTransformNode(analysis, lastQueryNode);
- this.root =
- new LastQueryNode(
- context.getQueryId().genPlanNodeId(),
- sourceNodeList,
- timeseriesOrdering,
- analysis.getLastQueryNonWritableViewSourceExpressionMap() != null);
+ lastQueryNode.sort();
+ this.root = lastQueryNode;
ColumnHeaderConstant.lastQueryColumnHeaders.forEach(
columnHeader ->
context
.getTypeProvider()
.setType(columnHeader.getColumnName(),
columnHeader.getColumnType()));
+ // After planning is completed, this map is no longer needed
+ lastQueryNode.clearMeasurementSchema2IdxMap();
return this;
}
- private void processLastQueryTransformNode(Analysis analysis, List<PlanNode>
sourceNodeList) {
+ private void processLastQueryTransformNode(Analysis analysis, LastQueryNode
lastQueryNode) {
if (analysis.getLastQueryNonWritableViewSourceExpressionMap() == null) {
return;
}
+ context.setNeedUpdateScanNumForLastQuery(true);
for (Map.Entry<Expression, List<Expression>> entry :
analysis.getLastQueryNonWritableViewSourceExpressionMap().entrySet()) {
@@ -398,7 +354,7 @@ public class LogicalPlanBuilder {
planBuilder.getRoot(),
expression.getViewPath().getFullPath(),
analysis.getType(expression).toString());
- sourceNodeList.add(transformNode);
+ lastQueryNode.addChild(transformNode);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 30d778c079d..2e6a6be8d1a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -221,7 +221,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode;
@@ -278,6 +277,7 @@ import
org.apache.tsfile.read.filter.operator.TimeFilterOperators.TimeGtEq;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TimeDuration;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -2754,129 +2754,90 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
((SchemaDriverContext)
(context.getDriverContext())).getSchemaRegion());
}
- @Override
- public Operator visitLastQueryScan(LastQueryScanNode node,
LocalExecutionPlanContext context) {
- PartialPath seriesPath = node.getSeriesPath().transformToPartialPath();
- TimeValuePair timeValuePair = null;
- context.dataNodeQueryContext.lock();
- try {
- if (!context.dataNodeQueryContext.unCached(seriesPath)) {
- timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath);
- if (timeValuePair == null) {
- context.dataNodeQueryContext.addUnCachePath(seriesPath,
node.getDataNodeSeriesScanNum());
- }
- }
- } finally {
- context.dataNodeQueryContext.unLock();
- }
-
- if (timeValuePair == null) { // last value is not cached
- return createUpdateLastCacheOperator(node, context,
node.getSeriesPath());
- } else if (timeValuePair.getValue() == null) { // there is no data for
this time series
- return null;
- } else if (!LastQueryUtil.satisfyFilter(
- updateFilterUsingTTL(
- context.getGlobalTimeFilter(),
-
DataNodeTTLCache.getInstance().getTTL(seriesPath.getDevicePath().getNodes())),
- timeValuePair)) { // cached last value is not satisfied
-
- if (!isFilterGtOrGe(context.getGlobalTimeFilter())) {
- // time filter is not > or >=, we still need to read from disk
- return createUpdateLastCacheOperator(node, context,
node.getSeriesPath());
- } else { // otherwise, we just ignore it and return null
- return null;
- }
- } else { // cached last value is satisfied, put it into
LastCacheScanOperator
- context.addCachedLastValue(timeValuePair, node.outputPathSymbol());
- return null;
- }
- }
-
- private boolean isFilterGtOrGe(Filter filter) {
+ public static boolean isFilterGtOrGe(Filter filter) {
return filter instanceof TimeGt || filter instanceof TimeGtEq;
}
private UpdateLastCacheOperator createUpdateLastCacheOperator(
- LastQueryScanNode node, LocalExecutionPlanContext context,
MeasurementPath fullPath) {
- SeriesAggregationScanOperator lastQueryScan =
createLastQueryScanOperator(node, context);
- if (node.getOutputViewPath() == null) {
- OperatorContext operatorContext =
- context
- .getDriverContext()
- .addOperatorContext(
- context.getNextOperatorId(),
- node.getPlanNodeId(),
- UpdateLastCacheOperator.class.getSimpleName());
- return new UpdateLastCacheOperator(
- operatorContext,
- lastQueryScan,
- fullPath,
- node.getSeriesPath().getSeriesType(),
- DATA_NODE_SCHEMA_CACHE,
- context.isNeedUpdateLastCache(),
- context.isNeedUpdateNullEntry());
- } else {
- OperatorContext operatorContext =
- context
- .getDriverContext()
- .addOperatorContext(
- context.getNextOperatorId(),
- node.getPlanNodeId(),
- UpdateViewPathLastCacheOperator.class.getSimpleName());
- return new UpdateViewPathLastCacheOperator(
- operatorContext,
- lastQueryScan,
- fullPath,
- node.getSeriesPath().getSeriesType(),
- DATA_NODE_SCHEMA_CACHE,
- context.isNeedUpdateLastCache(),
- context.isNeedUpdateNullEntry(),
- node.getOutputViewPath());
- }
+ final LastQueryScanNode node, final LocalExecutionPlanContext context,
final int idx) {
+ IMeasurementSchema measurementSchema = node.getMeasurementSchema(idx);
+ final SeriesAggregationScanOperator lastQueryScan =
+ createLastQueryScanOperator(node, context, measurementSchema);
+ MeasurementPath fullPath =
+
node.getDevicePath().concatAsMeasurementPath(measurementSchema.getMeasurementId());
+ fullPath.setMeasurementSchema(measurementSchema);
+ final OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ UpdateLastCacheOperator.class.getSimpleName());
+
+ return Objects.isNull(node.getOutputViewPath())
+ ? new UpdateLastCacheOperator(
+ operatorContext,
+ lastQueryScan,
+ fullPath,
+ measurementSchema.getType(),
+ DATA_NODE_SCHEMA_CACHE,
+ context.isNeedUpdateLastCache(),
+ context.isNeedUpdateNullEntry())
+ : new UpdateViewPathLastCacheOperator(
+ operatorContext,
+ lastQueryScan,
+ fullPath,
+ measurementSchema.getType(),
+ DATA_NODE_SCHEMA_CACHE,
+ context.isNeedUpdateLastCache(),
+ context.isNeedUpdateNullEntry(),
+ node.getOutputViewPath());
}
private AlignedUpdateLastCacheOperator createAlignedUpdateLastCacheOperator(
- AlignedLastQueryScanNode node, AlignedPath unCachedPath,
LocalExecutionPlanContext context) {
- AlignedSeriesAggregationScanOperator lastQueryScan =
- createLastQueryScanOperator(node, unCachedPath, context);
+ final String outputViewPath,
+ final PlanNodeId planNodeId,
+ final AlignedPath unCachedPath,
+ final LocalExecutionPlanContext context,
+ final boolean deviceInMultiRegion) {
+ final AlignedSeriesAggregationScanOperator lastQueryScan =
+ createLastQueryScanOperator(planNodeId, unCachedPath, context);
- if (node.getOutputViewPath() == null) {
- OperatorContext operatorContext =
- context
- .getDriverContext()
- .addOperatorContext(
- context.getNextOperatorId(),
- node.getPlanNodeId(),
- AlignedUpdateLastCacheOperator.class.getSimpleName());
- return new AlignedUpdateLastCacheOperator(
- operatorContext,
- lastQueryScan,
- unCachedPath,
- DATA_NODE_SCHEMA_CACHE,
- context.isNeedUpdateLastCache(),
- context.isNeedUpdateNullEntry());
- } else {
- OperatorContext operatorContext =
- context
- .getDriverContext()
- .addOperatorContext(
- context.getNextOperatorId(),
- node.getPlanNodeId(),
-
AlignedUpdateViewPathLastCacheOperator.class.getSimpleName());
- return new AlignedUpdateViewPathLastCacheOperator(
- operatorContext,
- lastQueryScan,
- unCachedPath,
- DATA_NODE_SCHEMA_CACHE,
- context.isNeedUpdateLastCache(),
- context.isNeedUpdateNullEntry(),
- node.getOutputViewPath());
- }
+ final OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ planNodeId,
+ AlignedUpdateLastCacheOperator.class.getSimpleName());
+
+ return Objects.isNull(outputViewPath)
+ ? new AlignedUpdateLastCacheOperator(
+ operatorContext,
+ lastQueryScan,
+ unCachedPath,
+ DATA_NODE_SCHEMA_CACHE,
+ context.isNeedUpdateLastCache(),
+ context.isNeedUpdateNullEntry(),
+ deviceInMultiRegion)
+ : new AlignedUpdateViewPathLastCacheOperator(
+ operatorContext,
+ lastQueryScan,
+ unCachedPath,
+ DATA_NODE_SCHEMA_CACHE,
+ context.isNeedUpdateLastCache(),
+ context.isNeedUpdateNullEntry(),
+ outputViewPath,
+ deviceInMultiRegion);
}
private SeriesAggregationScanOperator createLastQueryScanOperator(
- LastQueryScanNode node, LocalExecutionPlanContext context) {
- MeasurementPath seriesPath = node.getSeriesPath();
+ LastQueryScanNode node,
+ LocalExecutionPlanContext context,
+ IMeasurementSchema measurementSchema) {
+ MeasurementPath seriesPath =
+
node.getDevicePath().concatAsMeasurementPath(measurementSchema.getMeasurementId());
+ seriesPath.setMeasurementSchema(measurementSchema);
OperatorContext operatorContext =
context
.getDriverContext()
@@ -2915,7 +2876,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
}
private AlignedSeriesAggregationScanOperator createLastQueryScanOperator(
- AlignedLastQueryScanNode node, AlignedPath unCachedPath,
LocalExecutionPlanContext context) {
+ PlanNodeId planNodeId, AlignedPath unCachedPath,
LocalExecutionPlanContext context) {
// last_time, last_value
List<Aggregator> aggregators = new ArrayList<>();
boolean canUseStatistics = true;
@@ -2939,11 +2900,11 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
- node.getPlanNodeId(),
+ planNodeId,
AlignedSeriesAggregationScanOperator.class.getSimpleName());
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
- node.getPlanNodeId(),
+ planNodeId,
unCachedPath,
Ordering.DESC,
scanOptionsBuilder.build(),
@@ -2960,18 +2921,22 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
}
@Override
- public Operator visitAlignedLastQueryScan(
- AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
- AlignedPath alignedPath = node.getSeriesPath();
- PartialPath devicePath = alignedPath.getDevicePath();
- // get series under aligned entity that has not been cached
+ public Operator visitLastQueryScan(LastQueryScanNode node,
LocalExecutionPlanContext context) {
+ DATA_NODE_SCHEMA_CACHE.cleanUp();
+ final PartialPath devicePath = node.getDevicePath();
+ List<Integer> idxOfMeasurementSchemas = node.getIdxOfMeasurementSchemas();
List<Integer> unCachedMeasurementIndexes = new ArrayList<>();
- List<String> measurementList = alignedPath.getMeasurementList();
- for (int i = 0; i < measurementList.size(); i++) {
- PartialPath measurementPath =
devicePath.concatNode(measurementList.get(i));
+ Filter filter =
+ updateFilterUsingTTL(
+ context.getGlobalTimeFilter(),
+ DataNodeTTLCache.getInstance().getTTL(devicePath.getNodes()));
+ for (int i = 0; i < idxOfMeasurementSchemas.size(); i++) {
+ IMeasurementSchema measurementSchema = node.getMeasurementSchema(i);
+ final PartialPath measurementPath =
+ devicePath.concatNode(measurementSchema.getMeasurementId());
TimeValuePair timeValuePair = null;
+ context.dataNodeQueryContext.lock(node.isDeviceInMultiRegion());
try {
- context.dataNodeQueryContext.lock();
if (!context.dataNodeQueryContext.unCached(measurementPath)) {
timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(measurementPath);
if (timeValuePair == null) {
@@ -2980,18 +2945,15 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
}
}
} finally {
- context.dataNodeQueryContext.unLock();
+ context.dataNodeQueryContext.unLock(node.isDeviceInMultiRegion());
}
if (timeValuePair == null) { // last value is not cached
unCachedMeasurementIndexes.add(i);
} else if (timeValuePair.getValue() == null) {
// there is no data for this time series, just ignore
- } else if (!LastQueryUtil.satisfyFilter(
- updateFilterUsingTTL(
- context.getGlobalTimeFilter(),
- DataNodeTTLCache.getInstance().getTTL(devicePath.getNodes())),
- timeValuePair)) { // cached last value is not satisfied
+ } else if (!LastQueryUtil.satisfyFilter(filter, timeValuePair)) {
+ // cached last value is not satisfied
if (!isFilterGtOrGe(context.getGlobalTimeFilter())) {
// time filter is not > or >=, we still need to read from disk
@@ -3007,12 +2969,33 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
}
if (unCachedMeasurementIndexes.isEmpty()) {
return null;
+ }
+ if (node.isAligned()) {
+ AlignedPath unCachedPath = new AlignedPath(node.getDevicePath());
+ for (int i : unCachedMeasurementIndexes) {
+ IMeasurementSchema measurementSchema = node.getMeasurementSchema(i);
+ unCachedPath.addMeasurement(measurementSchema.getMeasurementId(),
measurementSchema);
+ }
+ return createAlignedUpdateLastCacheOperator(
+ node.getOutputViewPath(),
+ node.getPlanNodeId(),
+ unCachedPath,
+ context,
+ node.isDeviceInMultiRegion());
} else {
- AlignedPath unCachedPath = new AlignedPath(alignedPath.getDevicePath());
+ List<Operator> operators = new
ArrayList<>(unCachedMeasurementIndexes.size());
for (int i : unCachedMeasurementIndexes) {
- unCachedPath.addMeasurement(measurementList.get(i),
alignedPath.getSchemaList().get(i));
+ Operator operator = createUpdateLastCacheOperator(node, context, i);
+ operators.add(operator);
}
- return createAlignedUpdateLastCacheOperator(node, unCachedPath, context);
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ LastQueryCollectOperator.class.getSimpleName());
+ return new LastQueryCollectOperator(operatorContext, operators);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
index 34d48ea8a72..5016a9f1df2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
@@ -34,7 +34,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
@@ -134,11 +133,6 @@ public class SubPlanTypeExtractor {
return null;
}
- @Override
- public Void visitAlignedLastQueryScan(AlignedLastQueryScanNode node, Void
context) {
- return null;
- }
-
@Override
public Void visitLastQuery(LastQueryNode node, Void context) {
if (node.isContainsLastTransformNode()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index e5983b6a18c..f390d90c842 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -61,7 +61,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
@@ -195,12 +194,6 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
return processNoChildSourceNode(node, context);
}
- @Override
- public PlanNode visitAlignedLastQueryScan(
- AlignedLastQueryScanNode node, NodeGroupContext context) {
- return processNoChildSourceNode(node, context);
- }
-
@Override
public PlanNode visitSeriesAggregationScan(
SeriesAggregationScanNode node, NodeGroupContext context) {
@@ -263,7 +256,13 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
@Override
public PlanNode visitLastQuery(LastQueryNode node, NodeGroupContext context)
{
- return processMultiChildNode(node, context);
+ // At this point, there should only be LastSeriesSourceNode in
LastQueryNode, and all of them
+ // have been grouped in the rewriteSource stage by region.
+ context.putNodeDistribution(
+ node.getPlanNodeId(),
+ new NodeDistribution(
+ NodeDistributionType.SAME_WITH_ALL_CHILDREN,
node.getRegionReplicaSetByFirstChild()));
+ return node;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
index 2cdd071c931..5f01e646b06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
@@ -63,6 +64,17 @@ public class NodeGroupContext {
}
private void countRegionOfSourceNodes(PlanNode root, Map<TRegionReplicaSet,
Long> result) {
+ if (root instanceof LastQueryNode) {
+ // At this point, there should only be LastSeriesSourceNode in
LastQueryNode, and all of them
+ // have been grouped in the rewriteSource stage by region.
+ TRegionReplicaSet regionReplicaSet = ((LastQueryNode)
root).getRegionReplicaSetByFirstChild();
+ if (regionReplicaSet != DataPartition.NOT_ASSIGNED) {
+ result.compute(
+ regionReplicaSet,
+ (region, count) -> (count == null) ? 1 : count +
root.getChildren().size());
+ }
+ return;
+ }
root.getChildren().forEach(child -> countRegionOfSourceNodes(child,
result));
if (root instanceof SourceNode) {
TRegionReplicaSet regionReplicaSet = ((SourceNode)
root).getRegionReplicaSet();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index d9e91787ad4..e0a193b63f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -38,7 +38,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastSeriesSourceNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement;
@@ -107,7 +107,8 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
// compute dataNodeSeriesScanNum in LastQueryScanNode
if (analysis.getStatement() instanceof QueryStatement
- && ((QueryStatement) analysis.getStatement()).isLastQuery()) {
+ && ((QueryStatement) analysis.getStatement()).isLastQuery()
+ && queryContext.needUpdateScanNumForLastQuery()) {
final Map<Path, AtomicInteger> pathSumMap = new HashMap<>();
dataNodeFIMap
.values()
@@ -123,8 +124,11 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
}
private void updateScanNum(PlanNode planNode, Map<Path, AtomicInteger>
pathSumMap) {
- if (planNode instanceof LastSeriesSourceNode) {
- LastSeriesSourceNode lastSeriesSourceNode = (LastSeriesSourceNode)
planNode;
+ if (planNode instanceof LastQueryScanNode) {
+ LastQueryScanNode lastSeriesSourceNode = (LastQueryScanNode) planNode;
+ if (!lastSeriesSourceNode.isDeviceInMultiRegion()) {
+ return;
+ }
pathSumMap.merge(
lastSeriesSourceNode.getSeriesPath(),
lastSeriesSourceNode.getDataNodeSeriesScanNum(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 92fad2006bb..1724d2d639d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -20,11 +20,14 @@
package org.apache.iotdb.db.queryengine.plan.planner.distribution;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
@@ -61,7 +64,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.Inner
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode;
@@ -83,7 +85,10 @@ import
org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
import org.apache.iotdb.db.utils.constant.SqlConstant;
import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.filter.basic.Filter;
+import org.apache.tsfile.utils.Binary;
import java.util.ArrayList;
import java.util.Arrays;
@@ -773,14 +778,6 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
return processRawSeriesScan(node, context, mergeNode);
}
- @Override
- public List<PlanNode> visitAlignedLastQueryScan(
- AlignedLastQueryScanNode node, DistributionPlanContext context) {
- LastQueryNode mergeNode =
- new LastQueryNode(context.queryContext.getQueryId().genPlanNodeId(),
null, false);
- return processRawSeriesScan(node, context, mergeNode);
- }
-
private List<PlanNode> processRegionScan(RegionScanNode node,
DistributionPlanContext context) {
List<PlanNode> planNodeList = splitRegionScanNodeByRegion(node, context);
if (planNodeList.size() == 1) {
@@ -975,8 +972,14 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
// For last query, we need to keep every FI's root node is
LastQueryMergeNode. So we
// force every region group have a parent node even if there is only 1
child for it.
context.setForceAddParent();
- PlanNode root = processRawMultiChildNode(node, context, false);
- if (context.queryMultiRegion) {
+ boolean isLastQueryWithTransformNode = node.isContainsLastTransformNode();
+ PlanNode root = processRawMultiChildNode(node, context, false,
isLastQueryWithTransformNode);
+ // For LastQueryNode, we force the LastQueryTransformNode to be split from
the new cloned
+ // LastQueryNode for some subsequent optimizations. In the case of
multiple regions, we do not
+ // need to do anything to achieve this. The judgement of
'isLastQueryWithTransformNode' here
+ // is only for the case where the query involves only a single region. See
this document for
+ //
details(https://docs.google.com/document/d/1w_weCIr39htOUbkHk2ffGVz2-kqBfdvLSZ2EblJaHMo).
+ if (context.queryMultiRegion || isLastQueryWithTransformNode) {
PlanNode newRoot = genLastQueryRootNode(node, context);
// add sort op for each if we add LastQueryMergeNode as root
if (newRoot instanceof LastQueryMergeNode &&
!node.needOrderByTimeseries()) {
@@ -990,9 +993,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
}
private void addSortForEachLastQueryNode(PlanNode root, Ordering
timeseriesOrdering) {
- if (root instanceof LastQueryNode
- && (root.getChildren().get(0) instanceof LastQueryScanNode
- || root.getChildren().get(0) instanceof AlignedLastQueryScanNode))
{
+ if (root instanceof LastQueryNode && (root.getChildren().get(0) instanceof
LastQueryScanNode)) {
LastQueryNode lastQueryNode = (LastQueryNode) root;
lastQueryNode.setTimeseriesOrdering(timeseriesOrdering);
// sort children node
@@ -1004,8 +1005,6 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
String sortKey = "";
if (child instanceof LastQueryScanNode) {
sortKey = ((LastQueryScanNode)
child).getOutputSymbolForSort();
- } else if (child instanceof AlignedLastQueryScanNode) {
- sortKey = ((AlignedLastQueryScanNode)
child).getOutputSymbolForSort();
}
return sortKey;
}))
@@ -1014,11 +1013,18 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
.getChildren()
.forEach(
child -> {
- if (child instanceof AlignedLastQueryScanNode) {
- // sort the measurements of AlignedPath for
LastQueryMergeOperator
- ((AlignedLastQueryScanNode) child)
- .getSeriesPath()
- .sortMeasurement(Comparator.naturalOrder());
+ if (child instanceof LastQueryScanNode) {
+ // sort the measurements for LastQueryMergeOperator
+ LastQueryScanNode node = (LastQueryScanNode) child;
+ ((LastQueryScanNode) child)
+ .getIdxOfMeasurementSchemas()
+ .sort(
+ Comparator.comparing(
+ idx ->
+ new Binary(
+
node.getMeasurementSchema(idx).getMeasurementId(),
+ TSFileConfig.STRING_CHARSET),
+ Comparator.naturalOrder()));
}
});
} else {
@@ -1223,12 +1229,15 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
if (containsAggregationSource(node)) {
return planAggregationWithTimeJoin(node, context);
}
- return Collections.singletonList(processRawMultiChildNode(node, context,
true));
+ return Collections.singletonList(processRawMultiChildNode(node, context,
true, false));
}
// Only `visitFullOuterTimeJoin` and `visitLastQuery` invoke this method
private PlanNode processRawMultiChildNode(
- MultiChildProcessNode node, DistributionPlanContext context, boolean
isTimeJoin) {
+ MultiChildProcessNode node,
+ DistributionPlanContext context,
+ boolean isTimeJoin,
+ boolean isLastQueryWithTransformNode) {
MultiChildProcessNode root = (MultiChildProcessNode) node.clone();
Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
groupBySourceNodes(node, context);
@@ -1248,21 +1257,27 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
// If `forceAddParent` is true, we should not create new MultiChildNode
as the parent, either.
// At last, we can use the parameter `addParent` to judge whether to
create new
// MultiChildNode.
+ // For LastQueryNode, we force the LastQueryTransformNode to be split
from the new cloned
+ // LastQueryNode for some subsequent optimizations. In the case of
multiple regions, we do not
+ // need to do anything to achieve this. The judgement of
'isLastQueryWithTransformNode' here
+ // is only for the case where the query involves only a single region.
See this document for
+ //
details(https://docs.google.com/document/d/1w_weCIr39htOUbkHk2ffGVz2-kqBfdvLSZ2EblJaHMo).
boolean appendToRootDirectly =
- sourceGroup.size() == 1 || (!addParent &&
!context.isForceAddParent());
+ !isLastQueryWithTransformNode
+ && (sourceGroup.size() == 1 || (!addParent &&
!context.isForceAddParent()));
if (appendToRootDirectly) {
// In non-last query, this code can be reached at most once
// And we set region as MainFragmentLocatedRegion, the others Region
should transfer data to
// this region
context.queryContext.setMainFragmentLocatedRegion(region);
- seriesScanNodes.forEach(root::addChild);
+ root.addChildren(seriesScanNodes);
addParent = true;
} else {
// We clone a TimeJoinNode from root to make the params to be
consistent.
// But we need to assign a new ID to it
MultiChildProcessNode parentOfGroup = (MultiChildProcessNode)
root.clone();
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
- seriesScanNodes.forEach(parentOfGroup::addChild);
+ parentOfGroup.addChildren(seriesScanNodes);
root.addChild(parentOfGroup);
}
}
@@ -1274,7 +1289,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
// SeriesScanNode or SeriesAggregateScanNode
// So this branch should not be touched.
List<PlanNode> children = visit(child, context);
- children.forEach(root::addChild);
+ root.addChildren(children);
}
}
return root;
@@ -1285,26 +1300,38 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
// Step 1: Get all source nodes. For the node which is not source, add it
as the child of
// current TimeJoinNode
List<SourceNode> sources = new ArrayList<>();
+ Map<String, Map<Integer, List<TRegionReplicaSet>>> cachedRegionReplicas =
new HashMap<>();
for (PlanNode child : node.getChildren()) {
if (child instanceof SeriesSourceNode) {
// If the child is SeriesScanNode, we need to check whether this node
should be seperated
// into several splits.
SeriesSourceNode sourceNode = (SeriesSourceNode) child;
List<TRegionReplicaSet> dataDistribution =
- analysis.getPartitionInfo(
- sourceNode.getPartitionPath(),
context.getPartitionTimeFilter());
- if (dataDistribution.size() > 1) {
+ getDeviceReplicaSets(
+ sourceNode.getPartitionPath().getDevice(),
+ context.getPartitionTimeFilter(),
+ cachedRegionReplicas);
+ boolean deviceInMultiRegion = dataDistribution.size() > 1;
+ if (deviceInMultiRegion) {
// If there is some series which is distributed in multi DataRegions
context.setOneSeriesInMultiRegion(true);
}
// If the size of dataDistribution is N, this SeriesScanNode should be
seperated into N
// SeriesScanNode.
for (TRegionReplicaSet dataRegion : dataDistribution) {
- SeriesSourceNode split = (SeriesSourceNode) sourceNode.clone();
+ SeriesSourceNode split =
+ (SeriesSourceNode) (deviceInMultiRegion ? sourceNode.clone() :
sourceNode);
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
split.setRegionReplicaSet(dataRegion);
+ if (split instanceof LastQueryScanNode) {
+ ((LastQueryScanNode)
split).setDeviceInMultiRegion(deviceInMultiRegion);
+ }
sources.add(split);
}
+
+ if (deviceInMultiRegion) {
+ context.getQueryContext().setNeedUpdateScanNumForLastQuery(true);
+ }
}
}
@@ -1318,6 +1345,69 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
return sourceGroup;
}
+ private List<TRegionReplicaSet> getDeviceReplicaSets(
+ String deviceID,
+ Filter timeFilter,
+ Map<String, Map<Integer, List<TRegionReplicaSet>>> cache) {
+ DataPartition dataPartition = analysis.getDataPartitionInfo();
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
+ dataPartitionMap = dataPartition.getDataPartitionMap();
+
+ String db = null;
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> seriesPartitionMap =
+ null;
+ for (Map.Entry<
+ String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
+ entry : dataPartitionMap.entrySet()) {
+ if (PathUtils.isStartWith(deviceID, entry.getKey())) {
+ db = entry.getKey();
+ seriesPartitionMap = entry.getValue();
+ break;
+ }
+ }
+ if (seriesPartitionMap == null) {
+ return Collections.singletonList(NOT_ASSIGNED);
+ }
+
+ Map<Integer, List<TRegionReplicaSet>> slot2ReplicasMap =
+ cache.computeIfAbsent(db, k -> new HashMap<>());
+ TSeriesPartitionSlot tSeriesPartitionSlot =
dataPartition.calculateDeviceGroupId(deviceID);
+
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
+ finalSeriesPartitionMap = seriesPartitionMap;
+ return slot2ReplicasMap.computeIfAbsent(
+ tSeriesPartitionSlot.slotId,
+ k ->
+ getDataRegionReplicaSetWithTimeFilter(
+ finalSeriesPartitionMap, tSeriesPartitionSlot, timeFilter));
+ }
+
+ public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter(
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>
+ seriesPartitionMap,
+ TSeriesPartitionSlot tSeriesPartitionSlot,
+ Filter timeFilter) {
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> regionReplicaSetMap =
+ seriesPartitionMap.getOrDefault(tSeriesPartitionSlot,
Collections.emptyMap());
+ if (regionReplicaSetMap.isEmpty()) {
+ return Collections.singletonList(NOT_ASSIGNED);
+ }
+ List<TRegionReplicaSet> replicaSets = new ArrayList<>();
+ Set<TRegionReplicaSet> uniqueValues = new HashSet<>();
+ for (Map.Entry<TTimePartitionSlot, List<TRegionReplicaSet>> entry :
+ regionReplicaSetMap.entrySet()) {
+ if (!TimePartitionUtils.satisfyPartitionStartTime(timeFilter,
entry.getKey().startTime)) {
+ continue;
+ }
+ for (TRegionReplicaSet tRegionReplicaSet : entry.getValue()) {
+ if (uniqueValues.add(tRegionReplicaSet)) {
+ replicaSets.add(tRegionReplicaSet);
+ }
+ }
+ }
+ return replicaSets;
+ }
+
private boolean containsAggregationSource(FullOuterTimeJoinNode node) {
for (PlanNode child : node.getChildren()) {
if (child instanceof SeriesAggregationScanNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index 0fda7777792..b8fefee7201 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -56,7 +56,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
@@ -517,23 +516,10 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
public List<String> visitLastQueryScan(LastQueryScanNode node, GraphContext
context) {
List<String> boxValue = new ArrayList<>();
boxValue.add(String.format("LastQueryScan-%s",
node.getPlanNodeId().getId()));
- boxValue.add(String.format("Series: %s", node.getSeriesPath()));
- if (StringUtil.isNotBlank(node.getOutputViewPath())) {
- boxValue.add(String.format("ViewPath: %s", node.getOutputViewPath()));
- }
- boxValue.add(printRegion(node.getRegionReplicaSet()));
- return render(node, boxValue, context);
- }
-
- @Override
- public List<String> visitAlignedLastQueryScan(
- AlignedLastQueryScanNode node, GraphContext context) {
- List<String> boxValue = new ArrayList<>();
- boxValue.add(String.format("AlignedLastQueryScan-%s",
node.getPlanNodeId().getId()));
+ boxValue.add(String.format("Aligned: %s", node.isAligned()));
boxValue.add(
String.format(
- "Series: %s%s",
- node.getSeriesPath().getDevice(),
node.getSeriesPath().getMeasurementList()));
+ "Series: %s%s", node.getDevicePath().getIDeviceID(),
node.getMeasurementSchemas()));
if (StringUtil.isNotBlank(node.getOutputViewPath())) {
boxValue.add(String.format("ViewPath: %s", node.getOutputViewPath()));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 092f3601c57..6033233abbd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -96,7 +96,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode;
@@ -165,8 +164,10 @@ public enum PlanNodeType {
NODE_MANAGEMENT_MEMORY_MERGE((short) 42),
DELETE_DATA((short) 44),
DELETE_TIME_SERIES((short) 45),
- LAST_QUERY_SCAN((short) 46),
- ALIGNED_LAST_QUERY_SCAN((short) 47),
+ @Deprecated
+ DEPRECATED_LAST_QUERY_SCAN((short) 46),
+ @Deprecated
+ DEPRECATED_ALIGNED_LAST_QUERY_SCAN((short) 47),
LAST_QUERY((short) 48),
LAST_QUERY_MERGE((short) 49),
LAST_QUERY_COLLECT((short) 50),
@@ -224,6 +225,8 @@ public enum PlanNodeType {
DEVICE_SCHEMA_FETCH_SCAN((short) 96),
CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR((short) 97),
+
+ LAST_QUERY_SCAN((short) 98),
;
public static final int BYTES = Short.BYTES;
@@ -374,9 +377,8 @@ public enum PlanNodeType {
case 45:
return DeleteTimeSeriesNode.deserialize(buffer);
case 46:
- return LastQueryScanNode.deserialize(buffer);
case 47:
- return AlignedLastQueryScanNode.deserialize(buffer);
+ throw new UnsupportedOperationException("This LastQueryScanNode is
deprecated");
case 48:
return LastQueryNode.deserialize(buffer);
case 49:
@@ -478,6 +480,8 @@ public enum PlanNodeType {
case 97:
throw new UnsupportedOperationException(
"You should never see ContinuousSameSearchIndexSeparatorNode in
this function, because ContinuousSameSearchIndexSeparatorNode should never be
used in network transmission.");
+ case 98:
+ return LastQueryScanNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 92261d7ad47..f018817ffe6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -96,7 +96,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode;
@@ -163,10 +162,6 @@ public abstract class PlanVisitor<R, C> {
return visitSourceNode(node, context);
}
- public R visitAlignedLastQueryScan(AlignedLastQueryScanNode node, C context)
{
- return visitSourceNode(node, context);
- }
-
public R visitRegionScan(RegionScanNode node, C context) {
return visitSourceNode(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/MultiChildProcessNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/MultiChildProcessNode.java
index 02d8508e8fe..b3bd064c910 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/MultiChildProcessNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/MultiChildProcessNode.java
@@ -54,6 +54,10 @@ public abstract class MultiChildProcessNode extends
ProcessNode {
this.children.add(child);
}
+ public void addChildren(List<? extends PlanNode> children) {
+ this.children.addAll(children);
+ }
+
@Override
public int allowedChildCount() {
return CHILD_COUNT_NO_LIMIT;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
index 5d0589e40d4..4ce3f29750c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java
@@ -18,21 +18,34 @@
*/
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.schema.VectorMeasurementSchema;
import javax.annotation.Nullable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import static
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
@@ -46,31 +59,84 @@ public class LastQueryNode extends MultiChildProcessNode {
// if children contains LastTransformNode, this variable is only used in
distribute plan
private boolean containsLastTransformNode;
+ // After Logical planning is completed, this map is no longer needed and it
will be set to null
+ private Map<IMeasurementSchema, Integer> measurementSchema2IdxMap;
+ // All LastSeriesSourceNode share this structure
+ private final List<IMeasurementSchema> globalMeasurementSchemaList;
+
public LastQueryNode(
PlanNodeId id, @Nullable Ordering timeseriesOrdering, boolean
containsLastTransformNode) {
super(id);
this.timeseriesOrdering = timeseriesOrdering;
this.containsLastTransformNode = containsLastTransformNode;
+ this.measurementSchema2IdxMap = new HashMap<>();
+ this.globalMeasurementSchemaList = new ArrayList<>();
}
public LastQueryNode(
PlanNodeId id,
- List<PlanNode> children,
@Nullable Ordering timeseriesOrdering,
- boolean containsLastTransformNode) {
- super(id, children);
+ boolean containsLastTransformNode,
+ List<IMeasurementSchema> globalMeasurementSchemaList) {
+ super(id);
this.timeseriesOrdering = timeseriesOrdering;
this.containsLastTransformNode = containsLastTransformNode;
+ this.globalMeasurementSchemaList = globalMeasurementSchemaList;
}
- @Override
- public List<PlanNode> getChildren() {
- return children;
+ public long addDeviceLastQueryScanNode(
+ PlanNodeId id,
+ PartialPath devicePath,
+ boolean aligned,
+ List<IMeasurementSchema> measurementSchemas,
+ String outputViewPath) {
+ List<Integer> idxList = new ArrayList<>(measurementSchemas.size());
+ for (IMeasurementSchema measurementSchema : measurementSchemas) {
+ int idx =
+ measurementSchema2IdxMap.computeIfAbsent(
+ measurementSchema,
+ key -> {
+ this.globalMeasurementSchemaList.add(key);
+ return globalMeasurementSchemaList.size() - 1;
+ });
+ idxList.add(idx);
+ }
+ LastQueryScanNode scanNode =
+ new LastQueryScanNode(
+ id, devicePath, aligned, idxList, outputViewPath,
globalMeasurementSchemaList);
+ children.add(scanNode);
+ return scanNode.ramBytesUsed();
}
- @Override
- public void addChild(PlanNode child) {
- children.add(child);
+ public void sort() {
+ if (timeseriesOrdering == null) {
+ return;
+ }
+ children.sort(
+ Comparator.comparing(
+ child -> {
+ String sortKey = "";
+ if (child instanceof LastQueryScanNode) {
+ sortKey = ((LastQueryScanNode) child).getOutputSymbolForSort();
+ } else if (child instanceof LastQueryTransformNode) {
+ sortKey = ((LastQueryTransformNode)
child).getOutputSymbolForSort();
+ }
+ return sortKey;
+ }));
+ if (timeseriesOrdering.equals(Ordering.DESC)) {
+ Collections.reverse(children);
+ }
+ }
+
+ public void clearMeasurementSchema2IdxMap() {
+ this.measurementSchema2IdxMap = null;
+ }
+
+ public long getMemorySizeOfSharedStructures() {
+ // MeasurementSchema comes from path, memory has been calculated before
+ return RamUsageEstimator.alignObjectSize(
+ RamUsageEstimator.shallowSizeOf(globalMeasurementSchemaList)
+ +
RamUsageEstimator.sizeOfObjectArray(globalMeasurementSchemaList.size()));
}
@Override
@@ -80,12 +146,11 @@ public class LastQueryNode extends MultiChildProcessNode {
@Override
public PlanNode clone() {
- return new LastQueryNode(getPlanNodeId(), timeseriesOrdering,
containsLastTransformNode);
- }
-
- @Override
- public int allowedChildCount() {
- return CHILD_COUNT_NO_LIMIT;
+ return new LastQueryNode(
+ getPlanNodeId(),
+ timeseriesOrdering,
+ containsLastTransformNode,
+ globalMeasurementSchemaList);
}
@Override
@@ -135,6 +200,15 @@ public class LastQueryNode extends MultiChildProcessNode {
ReadWriteIOUtils.write((byte) 1, byteBuffer);
ReadWriteIOUtils.write(timeseriesOrdering.ordinal(), byteBuffer);
}
+ ReadWriteIOUtils.write(globalMeasurementSchemaList.size(), byteBuffer);
+ for (IMeasurementSchema measurementSchema : globalMeasurementSchemaList) {
+ if (measurementSchema instanceof MeasurementSchema) {
+ ReadWriteIOUtils.write((byte) 0, byteBuffer);
+ } else if (measurementSchema instanceof VectorMeasurementSchema) {
+ ReadWriteIOUtils.write((byte) 1, byteBuffer);
+ }
+ measurementSchema.serializeTo(byteBuffer);
+ }
}
@Override
@@ -146,6 +220,15 @@ public class LastQueryNode extends MultiChildProcessNode {
ReadWriteIOUtils.write((byte) 1, stream);
ReadWriteIOUtils.write(timeseriesOrdering.ordinal(), stream);
}
+ ReadWriteIOUtils.write(globalMeasurementSchemaList.size(), stream);
+ for (IMeasurementSchema measurementSchema : globalMeasurementSchemaList) {
+ if (measurementSchema instanceof MeasurementSchema) {
+ ReadWriteIOUtils.write((byte) 0, stream);
+ } else if (measurementSchema instanceof VectorMeasurementSchema) {
+ ReadWriteIOUtils.write((byte) 1, stream);
+ }
+ measurementSchema.serializeTo(stream);
+ }
}
public static LastQueryNode deserialize(ByteBuffer byteBuffer) {
@@ -154,8 +237,18 @@ public class LastQueryNode extends MultiChildProcessNode {
if (needOrderByTimeseries == 1) {
timeseriesOrdering =
Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
}
+ int measurementSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<IMeasurementSchema> measurementSchemas = new
ArrayList<>(measurementSize);
+ for (int i = 0; i < measurementSize; i++) {
+ byte type = ReadWriteIOUtils.readByte(byteBuffer);
+ if (type == 0) {
+ measurementSchemas.add(MeasurementSchema.deserializeFrom(byteBuffer));
+ } else if (type == 1) {
+
measurementSchemas.add(VectorMeasurementSchema.deserializeFrom(byteBuffer));
+ }
+ }
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new LastQueryNode(planNodeId, timeseriesOrdering, false);
+ return new LastQueryNode(planNodeId, timeseriesOrdering, false,
measurementSchemas);
}
@Override
@@ -163,6 +256,15 @@ public class LastQueryNode extends MultiChildProcessNode {
this.children = children;
}
+ @Override
+ public void addChild(PlanNode child) {
+ if (child instanceof LastQueryScanNode) {
+ LastQueryScanNode childNode = (LastQueryScanNode) child;
+ childNode.setGlobalMeasurementSchemaList(globalMeasurementSchemaList);
+ }
+ super.addChild(child);
+ }
+
public Ordering getTimeseriesOrdering() {
return timeseriesOrdering;
}
@@ -182,4 +284,11 @@ public class LastQueryNode extends MultiChildProcessNode {
public boolean needOrderByTimeseries() {
return timeseriesOrdering != null;
}
+
+ // Before calling this method, you need to ensure that the current
LastQueryNode
+ // has been divided according to RegionReplicaSet.
+ public TRegionReplicaSet getRegionReplicaSetByFirstChild() {
+ SourceNode planNode = (SourceNode) children.get(0);
+ return planNode.getRegionReplicaSet();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
deleted file mode 100644
index c00e7ed81ec..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source;
-
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.commons.path.AlignedPath;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.path.PathDeserializeUtil;
-import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.tsfile.utils.RamUsageEstimator;
-import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.eclipse.jetty.util.StringUtil;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
-
-public class AlignedLastQueryScanNode extends LastSeriesSourceNode {
- private static final long INSTANCE_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(AlignedLastQueryScanNode.class);
-
- // The path of the target series which will be scanned.
- private final AlignedPath seriesPath;
-
- // The id of DataRegion where the node will run
- private TRegionReplicaSet regionReplicaSet;
-
- private final String outputViewPath;
-
- public AlignedLastQueryScanNode(PlanNodeId id, AlignedPath seriesPath,
String outputViewPath) {
- super(id, new AtomicInteger(1));
- this.seriesPath = seriesPath;
- this.outputViewPath = outputViewPath;
- }
-
- public AlignedLastQueryScanNode(
- PlanNodeId id,
- AlignedPath seriesPath,
- AtomicInteger dataNodeSeriesScanNum,
- String outputViewPath,
- TRegionReplicaSet regionReplicaSet) {
- super(id, dataNodeSeriesScanNum);
- this.seriesPath = seriesPath;
- this.outputViewPath = outputViewPath;
- this.regionReplicaSet = regionReplicaSet;
- }
-
- public AlignedLastQueryScanNode(
- PlanNodeId id,
- AlignedPath seriesPath,
- AtomicInteger dataNodeSeriesScanNum,
- String outputViewPath) {
- super(id, dataNodeSeriesScanNum);
- this.seriesPath = seriesPath;
- this.outputViewPath = outputViewPath;
- }
-
- public String getOutputViewPath() {
- return outputViewPath;
- }
-
- @Override
- public void open() throws Exception {
- // Do nothing
- }
-
- @Override
- public TRegionReplicaSet getRegionReplicaSet() {
- return regionReplicaSet;
- }
-
- @Override
- public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
- this.regionReplicaSet = regionReplicaSet;
- }
-
- @Override
- public void close() throws Exception {
- // Do nothing
- }
-
- @Override
- public List<PlanNode> getChildren() {
- return ImmutableList.of();
- }
-
- @Override
- public void addChild(PlanNode child) {
- throw new UnsupportedOperationException("no child is allowed for
SeriesScanNode");
- }
-
- @Override
- public PlanNodeType getType() {
- return PlanNodeType.ALIGNED_LAST_QUERY_SCAN;
- }
-
- @Override
- public PlanNode clone() {
- return new AlignedLastQueryScanNode(
- getPlanNodeId(), seriesPath, getDataNodeSeriesScanNum(),
outputViewPath, regionReplicaSet);
- }
-
- @Override
- public int allowedChildCount() {
- return NO_CHILD_ALLOWED;
- }
-
- @Override
- public List<String> getOutputColumnNames() {
- return LAST_QUERY_HEADER_COLUMNS;
- }
-
- @Override
- public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitAlignedLastQueryScan(this, context);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- AlignedLastQueryScanNode that = (AlignedLastQueryScanNode) o;
- return Objects.equals(seriesPath, that.seriesPath)
- && Objects.equals(outputViewPath, that.outputViewPath)
- && Objects.equals(regionReplicaSet, that.regionReplicaSet);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), seriesPath, outputViewPath,
regionReplicaSet);
- }
-
- @Override
- public String toString() {
- if (StringUtil.isNotBlank(outputViewPath)) {
- return String.format(
- "AlignedLastQueryScanNode-%s:[SeriesPath: %s, ViewPath: %s,
DataRegion: %s]",
- this.getPlanNodeId(),
- this.getSeriesPath().getFormattedString(),
- this.getOutputViewPath(),
- PlanNodeUtil.printRegionReplicaSet(this.getRegionReplicaSet()));
- } else {
- return String.format(
- "AlignedLastQueryScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
- this.getPlanNodeId(),
- this.getSeriesPath().getFormattedString(),
- PlanNodeUtil.printRegionReplicaSet(this.getRegionReplicaSet()));
- }
- }
-
- @Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.ALIGNED_LAST_QUERY_SCAN.serialize(byteBuffer);
- seriesPath.serialize(byteBuffer);
- ReadWriteIOUtils.write(getDataNodeSeriesScanNum().get(), byteBuffer);
- ReadWriteIOUtils.write(outputViewPath == null, byteBuffer);
- if (outputViewPath != null) {
- ReadWriteIOUtils.write(outputViewPath, byteBuffer);
- }
- }
-
- @Override
- protected void serializeAttributes(DataOutputStream stream) throws
IOException {
- PlanNodeType.ALIGNED_LAST_QUERY_SCAN.serialize(stream);
- seriesPath.serialize(stream);
- ReadWriteIOUtils.write(getDataNodeSeriesScanNum().get(), stream);
- ReadWriteIOUtils.write(outputViewPath == null, stream);
- if (outputViewPath != null) {
- ReadWriteIOUtils.write(outputViewPath, stream);
- }
- }
-
- public static AlignedLastQueryScanNode deserialize(ByteBuffer byteBuffer) {
- AlignedPath partialPath = (AlignedPath)
PathDeserializeUtil.deserialize(byteBuffer);
- int dataNodeSeriesScanNum = ReadWriteIOUtils.readInt(byteBuffer);
- boolean isNull = ReadWriteIOUtils.readBool(byteBuffer);
- String outputPathSymbol = isNull ? null :
ReadWriteIOUtils.readString(byteBuffer);
- PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new AlignedLastQueryScanNode(
- planNodeId, partialPath, new AtomicInteger(dataNodeSeriesScanNum),
outputPathSymbol);
- }
-
- public AlignedPath getSeriesPath() {
- return seriesPath;
- }
-
- public String getOutputSymbolForSort() {
- if (outputViewPath != null) {
- return outputViewPath;
- }
- if (seriesPath.getMeasurementList().size() > 1) {
- return seriesPath.getDevice();
- }
- return seriesPath.transformToPartialPath().getFullPath();
- }
-
- @Override
- public PartialPath getPartitionPath() {
- return getSeriesPath();
- }
-
- @Override
- public long ramBytesUsed() {
- return INSTANCE_SIZE
- + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id)
- + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath)
- + RamUsageEstimator.sizeOf(outputViewPath);
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java
index 8d4f8291a62..d7908e6b759 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java
@@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
@@ -33,14 +33,17 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import com.google.common.collect.ImmutableList;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.eclipse.jetty.util.StringUtil;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
public class LastQueryScanNode extends LastSeriesSourceNode {
@@ -53,40 +56,85 @@ public class LastQueryScanNode extends LastSeriesSourceNode
{
ColumnHeaderConstant.VALUE,
ColumnHeaderConstant.DATATYPE);
- // The path of the target series which will be scanned.
- private final MeasurementPath seriesPath;
+ private final PartialPath devicePath;
+ private final boolean aligned;
+ private final List<Integer> indexOfMeasurementSchemas;
+ // This structure does not need to be serialized or deserialized.
+ // It will be set when the current Node is added to the child by the upper
LastQueryNode.
+ private List<IMeasurementSchema> globalMeasurementSchemaList;
private final String outputViewPath;
// The id of DataRegion where the node will run
private TRegionReplicaSet regionReplicaSet;
+ private boolean deviceInMultiRegion = false;
- public LastQueryScanNode(PlanNodeId id, MeasurementPath seriesPath, String
outputViewPath) {
+ public LastQueryScanNode(
+ PlanNodeId id,
+ PartialPath devicePath,
+ boolean aligned,
+ List<Integer> indexOfMeasurementSchemas,
+ String outputViewPath,
+ List<IMeasurementSchema> globalMeasurementSchemaList) {
super(id, new AtomicInteger(1));
- this.seriesPath = seriesPath;
+ this.aligned = aligned;
+ this.devicePath = devicePath;
+ this.indexOfMeasurementSchemas = indexOfMeasurementSchemas;
this.outputViewPath = outputViewPath;
+ this.globalMeasurementSchemaList = globalMeasurementSchemaList;
}
public LastQueryScanNode(
PlanNodeId id,
- MeasurementPath seriesPath,
+ PartialPath devicePath,
+ boolean aligned,
+ List<Integer> indexOfMeasurementSchemas,
AtomicInteger dataNodeSeriesScanNum,
String outputViewPath) {
+ this(
+ id,
+ devicePath,
+ aligned,
+ indexOfMeasurementSchemas,
+ dataNodeSeriesScanNum,
+ outputViewPath,
+ null);
+ }
+
+ public LastQueryScanNode(
+ PlanNodeId id,
+ PartialPath devicePath,
+ boolean aligned,
+ List<Integer> indexOfMeasurementSchemas,
+ AtomicInteger dataNodeSeriesScanNum,
+ String outputViewPath,
+ List<IMeasurementSchema> globalMeasurementSchemaList) {
super(id, dataNodeSeriesScanNum);
- this.seriesPath = seriesPath;
+ this.aligned = aligned;
+ this.devicePath = devicePath;
+ this.indexOfMeasurementSchemas = indexOfMeasurementSchemas;
this.outputViewPath = outputViewPath;
+ this.globalMeasurementSchemaList = globalMeasurementSchemaList;
}
public LastQueryScanNode(
PlanNodeId id,
- MeasurementPath seriesPath,
+ PartialPath devicePath,
+ boolean aligned,
+ List<Integer> indexOfMeasurementSchemas,
AtomicInteger dataNodeSeriesScanNum,
String outputViewPath,
- TRegionReplicaSet regionReplicaSet) {
+ TRegionReplicaSet regionReplicaSet,
+ boolean deviceInMultiRegion,
+ List<IMeasurementSchema> globalMeasurementSchemaList) {
super(id, dataNodeSeriesScanNum);
- this.seriesPath = seriesPath;
+ this.devicePath = devicePath;
+ this.aligned = aligned;
+ this.indexOfMeasurementSchemas = indexOfMeasurementSchemas;
this.outputViewPath = outputViewPath;
this.regionReplicaSet = regionReplicaSet;
+ this.deviceInMultiRegion = deviceInMultiRegion;
+ this.globalMeasurementSchemaList = globalMeasurementSchemaList;
}
@Override
@@ -102,8 +150,12 @@ public class LastQueryScanNode extends
LastSeriesSourceNode {
this.regionReplicaSet = regionReplicaSet;
}
- public MeasurementPath getSeriesPath() {
- return seriesPath;
+ public PartialPath getSeriesPath() {
+ return devicePath;
+ }
+
+ public boolean isAligned() {
+ return this.aligned;
}
public String getOutputViewPath() {
@@ -114,7 +166,7 @@ public class LastQueryScanNode extends LastSeriesSourceNode
{
if (outputViewPath != null) {
return outputViewPath;
}
- return seriesPath.getFullPath();
+ return devicePath.toString();
}
@Override
@@ -138,7 +190,15 @@ public class LastQueryScanNode extends
LastSeriesSourceNode {
@Override
public PlanNode clone() {
return new LastQueryScanNode(
- getPlanNodeId(), seriesPath, getDataNodeSeriesScanNum(),
outputViewPath, regionReplicaSet);
+ getPlanNodeId(),
+ devicePath,
+ aligned,
+ indexOfMeasurementSchemas,
+ getDataNodeSeriesScanNum(),
+ outputViewPath,
+ regionReplicaSet,
+ deviceInMultiRegion,
+ globalMeasurementSchemaList);
}
@Override
@@ -162,30 +222,42 @@ public class LastQueryScanNode extends
LastSeriesSourceNode {
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
LastQueryScanNode that = (LastQueryScanNode) o;
- return Objects.equals(seriesPath, that.seriesPath)
+ return Objects.equals(devicePath, that.devicePath)
+ && Objects.equals(aligned, that.aligned)
+ && Objects.equals(indexOfMeasurementSchemas,
that.indexOfMeasurementSchemas)
&& Objects.equals(outputViewPath, that.outputViewPath)
&& Objects.equals(regionReplicaSet, that.regionReplicaSet);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), seriesPath, outputViewPath,
regionReplicaSet);
+ return Objects.hash(
+ super.hashCode(),
+ devicePath,
+ aligned,
+ indexOfMeasurementSchemas,
+ outputViewPath,
+ regionReplicaSet);
}
@Override
public String toString() {
if (StringUtil.isNotBlank(outputViewPath)) {
return String.format(
- "LastQueryScanNode-%s:[SeriesPath: %s, ViewPath: %s, DataRegion:
%s]",
+ "LastQueryScanNode-%s:[Device: %s, Aligned: %s, Measurements: %s,
ViewPath: %s, DataRegion: %s]",
this.getPlanNodeId(),
- this.getSeriesPath(),
+ this.getDevicePath(),
+ this.aligned,
+ this.getMeasurementSchemas(),
this.getOutputViewPath(),
PlanNodeUtil.printRegionReplicaSet(getRegionReplicaSet()));
} else {
return String.format(
- "LastQueryScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
+ "LastQueryScanNode-%s:[Device: %s, Aligned: %s, Measurements: %s,
DataRegion: %s]",
this.getPlanNodeId(),
- this.getSeriesPath(),
+ this.getDevicePath(),
+ this.aligned,
+ this.getMeasurementSchemas(),
PlanNodeUtil.printRegionReplicaSet(getRegionReplicaSet()));
}
}
@@ -193,53 +265,106 @@ public class LastQueryScanNode extends
LastSeriesSourceNode {
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.LAST_QUERY_SCAN.serialize(byteBuffer);
- seriesPath.serialize(byteBuffer);
+ devicePath.serialize(byteBuffer);
+ ReadWriteIOUtils.write(aligned, byteBuffer);
+ ReadWriteIOUtils.write(indexOfMeasurementSchemas.size(), byteBuffer);
+ for (Integer measurementSchema : indexOfMeasurementSchemas) {
+ ReadWriteIOUtils.write(measurementSchema, byteBuffer);
+ }
ReadWriteIOUtils.write(getDataNodeSeriesScanNum().get(), byteBuffer);
ReadWriteIOUtils.write(outputViewPath == null, byteBuffer);
if (outputViewPath != null) {
ReadWriteIOUtils.write(outputViewPath, byteBuffer);
}
+ ReadWriteIOUtils.write(deviceInMultiRegion, byteBuffer);
}
@Override
protected void serializeAttributes(DataOutputStream stream) throws
IOException {
PlanNodeType.LAST_QUERY_SCAN.serialize(stream);
- seriesPath.serialize(stream);
+ devicePath.serialize(stream);
+ ReadWriteIOUtils.write(aligned, stream);
+ ReadWriteIOUtils.write(indexOfMeasurementSchemas.size(), stream);
+ for (Integer measurementSchema : indexOfMeasurementSchemas) {
+ ReadWriteIOUtils.write(measurementSchema, stream);
+ }
ReadWriteIOUtils.write(getDataNodeSeriesScanNum().get(), stream);
ReadWriteIOUtils.write(outputViewPath == null, stream);
if (outputViewPath != null) {
ReadWriteIOUtils.write(outputViewPath, stream);
}
+ ReadWriteIOUtils.write(deviceInMultiRegion, stream);
}
public static LastQueryScanNode deserialize(ByteBuffer byteBuffer) {
- MeasurementPath partialPath = (MeasurementPath)
PathDeserializeUtil.deserialize(byteBuffer);
+ PartialPath devicePath = (PartialPath)
PathDeserializeUtil.deserialize(byteBuffer);
+ boolean aligned = ReadWriteIOUtils.readBool(byteBuffer);
+ int measurementSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Integer> measurementSchemas = new ArrayList<>(measurementSize);
+ for (int i = 0; i < measurementSize; i++) {
+ measurementSchemas.add(ReadWriteIOUtils.readInt(byteBuffer));
+ }
+
int dataNodeSeriesScanNum = ReadWriteIOUtils.readInt(byteBuffer);
boolean isNull = ReadWriteIOUtils.readBool(byteBuffer);
String outputPathSymbol = isNull ? null :
ReadWriteIOUtils.readString(byteBuffer);
+ boolean deviceInMultiRegion = ReadWriteIOUtils.readBool(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
return new LastQueryScanNode(
- planNodeId, partialPath, new AtomicInteger(dataNodeSeriesScanNum),
outputPathSymbol);
+ planNodeId,
+ devicePath,
+ aligned,
+ measurementSchemas,
+ new AtomicInteger(dataNodeSeriesScanNum),
+ outputPathSymbol,
+ null,
+ deviceInMultiRegion,
+ null);
}
- @Override
- public PartialPath getPartitionPath() {
- return getSeriesPath();
+ public void setGlobalMeasurementSchemaList(List<IMeasurementSchema>
globalMeasurementSchemaList) {
+ this.globalMeasurementSchemaList = globalMeasurementSchemaList;
}
- public String outputPathSymbol() {
- if (outputViewPath == null) {
- return seriesPath.getFullPath();
- } else {
- return outputViewPath;
- }
+ public IMeasurementSchema getMeasurementSchema(int idx) {
+ int globalIdx = indexOfMeasurementSchemas.get(idx);
+ return globalMeasurementSchemaList.get(globalIdx);
+ }
+
+ public PartialPath getDevicePath() {
+ return this.devicePath;
+ }
+
+ public boolean isDeviceInMultiRegion() {
+ return deviceInMultiRegion;
+ }
+
+ public void setDeviceInMultiRegion(boolean deviceInMultiRegion) {
+ this.deviceInMultiRegion = deviceInMultiRegion;
+ }
+
+ public List<Integer> getIdxOfMeasurementSchemas() {
+ return indexOfMeasurementSchemas;
+ }
+
+ public List<IMeasurementSchema> getMeasurementSchemas() {
+ return indexOfMeasurementSchemas.stream()
+ .map(globalMeasurementSchemaList::get)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public PartialPath getPartitionPath() {
+ return devicePath;
}
@Override
public long ramBytesUsed() {
return INSTANCE_SIZE
+ MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id)
- + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath)
+ // The memory of each String has been calculated before
+ +
MemoryEstimationHelper.getEstimatedSizeOfCopiedPartialPath(devicePath)
+ +
MemoryEstimationHelper.getEstimatedSizeOfIntegerArrayList(indexOfMeasurementSchemas)
+ RamUsageEstimator.sizeOf(outputViewPath);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryOperatorTest.java
index 23139279d58..b69471a2c7c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryOperatorTest.java
@@ -40,7 +40,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import
org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
@@ -55,6 +54,7 @@ import org.junit.Test;
import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -191,7 +191,7 @@ public class LastQueryOperatorTest {
LastQueryOperator lastQueryOperator =
new LastQueryOperator(
driverContext.getOperatorContexts().get(4),
- ImmutableList.of(updateLastCacheOperator1,
updateLastCacheOperator2),
+ Arrays.asList(updateLastCacheOperator1,
updateLastCacheOperator2),
LastQueryUtil.createTsBlockBuilder());
int count = 0;
@@ -328,7 +328,7 @@ public class LastQueryOperatorTest {
LastQueryOperator lastQueryOperator =
new LastQueryOperator(
driverContext.getOperatorContexts().get(4),
- ImmutableList.of(updateLastCacheOperator1,
updateLastCacheOperator2),
+ Arrays.asList(updateLastCacheOperator1,
updateLastCacheOperator2),
builder);
int count = 0;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQuerySortOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQuerySortOperatorTest.java
index e7c66a30970..da1ac9bf3ac 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQuerySortOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQuerySortOperatorTest.java
@@ -40,7 +40,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import
org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
@@ -55,6 +54,7 @@ import org.junit.Test;
import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
@@ -192,7 +192,7 @@ public class LastQuerySortOperatorTest {
new LastQuerySortOperator(
driverContext.getOperatorContexts().get(4),
LastQueryUtil.createTsBlockBuilder().build(),
- ImmutableList.of(updateLastCacheOperator1,
updateLastCacheOperator2),
+ Arrays.asList(updateLastCacheOperator1,
updateLastCacheOperator2),
Comparator.naturalOrder());
int count = 0;
@@ -329,7 +329,7 @@ public class LastQuerySortOperatorTest {
new LastQuerySortOperator(
driverContext.getOperatorContexts().get(4),
builder.build(),
- ImmutableList.of(updateLastCacheOperator2,
updateLastCacheOperator1),
+ Arrays.asList(updateLastCacheOperator2,
updateLastCacheOperator1),
Comparator.reverseOrder());
int count = 0;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java
index f0c930d0d00..7c425c32842 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java
@@ -21,8 +21,8 @@ package
org.apache.iotdb.db.queryengine.plan.planner.distribution;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
@@ -32,13 +32,10 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNo
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
import org.junit.Assert;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -194,21 +191,19 @@ public class LastQueryTest {
private LogicalQueryPlan constructLastQuery(List<String> paths,
MPPQueryContext context)
throws IllegalPathException {
- List<PlanNode> sourceNodeList = new ArrayList<>();
+ LastQueryNode root = new
LastQueryNode(context.getQueryId().genPlanNodeId(), null, false);
for (String path : paths) {
MeasurementPath selectPath = new MeasurementPath(path);
- if (selectPath.isUnderAlignedEntity()) {
- sourceNodeList.add(
- new AlignedLastQueryScanNode(
- context.getQueryId().genPlanNodeId(), new
AlignedPath(selectPath), null));
- } else {
- sourceNodeList.add(
- new LastQueryScanNode(context.getQueryId().genPlanNodeId(),
selectPath, null));
- }
+ PartialPath devicePath = selectPath.getDevicePath();
+ devicePath.setIDeviceID(selectPath.getDevice());
+ root.addDeviceLastQueryScanNode(
+ context.getQueryId().genPlanNodeId(),
+ devicePath,
+ selectPath.isUnderAlignedEntity(),
+ Collections.singletonList(selectPath.getMeasurementSchema()),
+ null);
}
- PlanNode root =
- new LastQueryNode(context.getQueryId().genPlanNodeId(),
sourceNodeList, null, false);
return new LogicalQueryPlan(context, root);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java
index 993b15562ea..2e51898a1f4 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java
@@ -39,9 +39,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAgg
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
@@ -52,6 +50,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.junit.Assert;
import org.junit.Test;
@@ -76,31 +75,37 @@ public class DataQueryLogicalPlannerTest {
// fake initResultNodeContext()
queryId.genPlanNodeId();
- LastQueryScanNode d1s1 =
- new LastQueryScanNode(
- queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d1.s1"), null);
- LastQueryScanNode d1s2 =
- new LastQueryScanNode(
- queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d1.s2"), null);
- LastQueryScanNode d1s3 =
- new LastQueryScanNode(
- queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d1.s3"), null);
- LastQueryScanNode d2s1 =
- new LastQueryScanNode(
- queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d2.s1"), null);
- LastQueryScanNode d2s2 =
- new LastQueryScanNode(
- queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d2.s2"), null);
- LastQueryScanNode d2s4 =
- new LastQueryScanNode(
- queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d2.s4"), null);
- AlignedLastQueryScanNode d2a =
- new AlignedLastQueryScanNode(
- queryId.genPlanNodeId(), (AlignedPath)
schemaMap.get("root.sg.d2.a"), null);
-
- List<PlanNode> sourceNodeList = Arrays.asList(d1s1, d1s2, d1s3, d2a, d2s1,
d2s2, d2s4);
- LastQueryNode lastQueryNode =
- new LastQueryNode(queryId.genPlanNodeId(), sourceNodeList,
Ordering.ASC, false);
+ List<IMeasurementSchema> measurementSchemas =
+ Arrays.asList(
+ ((MeasurementPath)
schemaMap.get("root.sg.d1.s1")).getMeasurementSchema(),
+ ((MeasurementPath)
schemaMap.get("root.sg.d1.s2")).getMeasurementSchema(),
+ ((MeasurementPath)
schemaMap.get("root.sg.d1.s3")).getMeasurementSchema());
+ MeasurementPath d1s1Path = (MeasurementPath)
schemaMap.get("root.sg.d1.s1");
+ LastQueryNode lastQueryNode = new LastQueryNode(queryId.genPlanNodeId(),
Ordering.ASC, false);
+
+ lastQueryNode.addDeviceLastQueryScanNode(
+ queryId.genPlanNodeId(),
+ d1s1Path.getDevicePath(),
+ d1s1Path.isUnderAlignedEntity(),
+ measurementSchemas,
+ null);
+
+ measurementSchemas =
+ Arrays.asList(
+ ((MeasurementPath)
schemaMap.get("root.sg.d2.s1")).getMeasurementSchema(),
+ ((MeasurementPath)
schemaMap.get("root.sg.d2.s2")).getMeasurementSchema(),
+ ((MeasurementPath)
schemaMap.get("root.sg.d2.s4")).getMeasurementSchema());
+ MeasurementPath d2s1Path = (MeasurementPath)
schemaMap.get("root.sg.d2.s1");
+ lastQueryNode.addDeviceLastQueryScanNode(
+ queryId.genPlanNodeId(),
+ d2s1Path.getDevicePath(),
+ d2s1Path.isUnderAlignedEntity(),
+ measurementSchemas,
+ null);
+
+ AlignedPath aPath = (AlignedPath) schemaMap.get("root.sg.d2.a");
+ lastQueryNode.addDeviceLastQueryScanNode(
+ queryId.genPlanNodeId(), aPath.getDevicePath(), true,
aPath.getSchemaList(), null);
PlanNode actualPlan = parseSQLToPlanNode(sql);
Assert.assertEquals(actualPlan, lastQueryNode);
@@ -114,19 +119,20 @@ public class DataQueryLogicalPlannerTest {
// fake initResultNodeContext()
queryId.genPlanNodeId();
- LastQueryScanNode d1s3 =
- new LastQueryScanNode(
- queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d1.s3"), null);
- LastQueryScanNode d1s1 =
- new LastQueryScanNode(
- queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d1.s1"), null);
- LastQueryScanNode d1s2 =
- new LastQueryScanNode(
- queryId.genPlanNodeId(), (MeasurementPath)
schemaMap.get("root.sg.d1.s2"), null);
-
- List<PlanNode> sourceNodeList = Arrays.asList(d1s3, d1s1, d1s2);
- LastQueryNode lastQueryNode =
- new LastQueryNode(queryId.genPlanNodeId(), sourceNodeList, null,
false);
+ LastQueryNode lastQueryNode = new LastQueryNode(queryId.genPlanNodeId(),
null, false);
+ List<IMeasurementSchema> measurementSchemas =
+ Arrays.asList(
+ ((MeasurementPath)
schemaMap.get("root.sg.d1.s3")).getMeasurementSchema(),
+ ((MeasurementPath)
schemaMap.get("root.sg.d1.s1")).getMeasurementSchema(),
+ ((MeasurementPath)
schemaMap.get("root.sg.d1.s2")).getMeasurementSchema());
+ MeasurementPath s3Path = (MeasurementPath) schemaMap.get("root.sg.d1.s3");
+ lastQueryNode.addDeviceLastQueryScanNode(
+ queryId.genPlanNodeId(),
+ s3Path.getDevicePath(),
+ s3Path.isUnderAlignedEntity(),
+ measurementSchemas,
+ null);
+
SortNode sortNode =
new SortNode(
queryId.genPlanNodeId(),
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/LastQueryScanNodeSerdeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/LastQueryScanNodeSerdeTest.java
new file mode 100644
index 00000000000..270411f3f51
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/LastQueryScanNodeSerdeTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.node.source;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import
org.apache.iotdb.db.queryengine.plan.planner.node.PlanNodeDeserializeHelper;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class LastQueryScanNodeSerdeTest {
+ @Test
+ public void test() throws IllegalPathException {
+ LastQueryScanNode node =
+ new LastQueryScanNode(
+ new PlanNodeId("test"),
+ new PartialPath("root.test.d1"),
+ true,
+ Arrays.asList(0, 1),
+ null,
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s0", TSDataType.BOOLEAN)));
+ ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
+ node.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), node);
+
+ node =
+ new LastQueryScanNode(
+ new PlanNodeId("test"),
+ new PartialPath("root.test.d1"),
+ false,
+ Arrays.asList(0, 1),
+ null,
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s0", TSDataType.BOOLEAN)));
+ byteBuffer = ByteBuffer.allocate(2048);
+ node.serialize(byteBuffer);
+ byteBuffer.flip();
+ assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), node);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 4acf680443b..89931701072 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -118,14 +118,17 @@ public class DataPartition extends Partition {
}
public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter(
- String deviceName, Filter timeFilter) {
- String storageGroup = getStorageGroupByDevice(deviceName);
- TSeriesPartitionSlot seriesPartitionSlot =
calculateDeviceGroupId(deviceName);
- if (!dataPartitionMap.containsKey(storageGroup)
- ||
!dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) {
+ final String deviceName, final Filter timeFilter) {
+ final String storageGroup = getStorageGroupByDevice(deviceName);
+ final TSeriesPartitionSlot seriesPartitionSlot =
calculateDeviceGroupId(deviceName);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> regionReplicaSetMap =
+ dataPartitionMap
+ .getOrDefault(storageGroup, Collections.emptyMap())
+ .getOrDefault(seriesPartitionSlot, Collections.emptyMap());
+ if (regionReplicaSetMap.isEmpty()) {
return Collections.singletonList(NOT_ASSIGNED);
}
- return
dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream()
+ return regionReplicaSetMap.entrySet().stream()
.filter(
entry ->
TimePartitionUtils.satisfyPartitionStartTime(timeFilter,
entry.getKey().startTime))
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
index 25b71eb718d..29c92599f24 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
@@ -39,7 +39,7 @@ public abstract class Partition {
seriesSlotExecutorName, seriesPartitionSlotNum);
}
- protected TSeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
+ public TSeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
return executor.getSeriesPartitionSlot(deviceName);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index ceec1f17c4e..fc04c94ba5b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -202,6 +202,15 @@ public class PartialPath extends Path implements
Comparable<Path>, Cloneable {
return new PartialPath(newPathNodes);
}
+ public MeasurementPath concatAsMeasurementPath(String measurement) {
+ int len = nodes.length;
+ String[] newNodes = Arrays.copyOf(nodes, nodes.length + 1);
+ newNodes[len] = measurement;
+ MeasurementPath measurementPath = new MeasurementPath(newNodes);
+ measurementPath.device = this.device;
+ return measurementPath;
+ }
+
public String[] getNodes() {
return nodes;
}
@@ -758,6 +767,10 @@ public class PartialPath extends Path implements
Comparable<Path>, Cloneable {
return nodes[0];
}
+ public void setIDeviceID(String deviceID) {
+ this.device = deviceID;
+ }
+
@Override
public String getDevice() {
if (device != null) {