This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a877b82b819 Make the construction method of class
TableAggTableScanOperator tidy
a877b82b819 is described below
commit a877b82b819d750092ea45f1935db2a26c035882
Author: Beyyes <[email protected]>
AuthorDate: Fri Dec 20 09:12:33 2024 +0800
Make the construction method of class TableAggTableScanOperator tidy
---
.../TableAggregationTableScanOperator.java | 159 ++++++-------
.../plan/planner/TableOperatorGenerator.java | 251 ++++++++++-----------
2 files changed, 192 insertions(+), 218 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
index e963963bf7d..8ef9ddbbf04 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
@@ -24,7 +24,7 @@ import
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
import org.apache.iotdb.db.queryengine.execution.operator.window.IWindow;
@@ -49,7 +49,6 @@ import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.block.column.BinaryColumn;
-import org.apache.tsfile.read.common.block.column.LongColumn;
import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
@@ -68,120 +67,97 @@ import java.util.stream.Collectors;
import static java.lang.String.format;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING;
+import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
import static
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER;
import static
org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
-public class TableAggregationTableScanOperator extends
AbstractSeriesAggregationScanOperator {
+public class TableAggregationTableScanOperator extends
AbstractDataSourceOperator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(TableAggregationTableScanOperator.class);
- private final List<TableAggregator> tableAggregators;
+ private boolean finished = false;
+ private TsBlock inputTsBlock;
+ private final List<TableAggregator> tableAggregators;
private final List<ColumnSchema> groupingKeySchemas;
private final int[] groupingKeyIndex;
- public static final LongColumn TIME_COLUMN_TEMPLATE =
- new LongColumn(1, Optional.empty(), new long[] {0});
-
- private final List<ColumnSchema> columnSchemas;
-
- private final int[] columnsIndexArray;
-
private final List<DeviceEntry> deviceEntries;
-
private final int deviceCount;
-
- private final Ordering scanOrder;
- private final SeriesScanOptions seriesScanOptions;
-
+ private int currentDeviceIndex;
private final List<String> measurementColumnNames;
private final Set<String> allSensors;
-
private final List<IMeasurementSchema> measurementSchemas;
-
private final List<TSDataType> measurementColumnTSDataTypes;
+ private final int measurementCount;
- // TODO calc maxTsBlockLineNum using date_bin
- private final int maxTsBlockLineNum;
+ private final List<ColumnSchema> aggColumnSchemas;
+ private final int[] aggColumnsIndexArray;
- // for different aggregations aiming to same column, use this variable to
point to same column
- private final List<Integer> aggArguments;
+ private final SeriesScanOptions seriesScanOptions;
+ private final boolean ascending;
+ private final Ordering scanOrder;
+ // Some special data types(like BLOB) cannot use statistics
+ protected final boolean canUseStatistics;
+ private final long cachedRawDataSize;
- private QueryDataSource queryDataSource;
+ // stores all inputChannels of tableAggregators,
+ // e.g. for aggregation `last(s1), count(s2), count(s1)`, the inputChannels
should be [0, 1, 0]
+ private final List<Integer> aggregatorInputChannels;
- private int currentDeviceIndex;
+ private QueryDataSource queryDataSource;
- ITableTimeRangeIterator timeIterator;
+ private final ITableTimeRangeIterator timeIterator;
private boolean allAggregatorsHasFinalResult = false;
public TableAggregationTableScanOperator(
PlanNodeId sourceId,
OperatorContext context,
- List<ColumnSchema> columnSchemas,
- int[] columnsIndexArray,
+ List<ColumnSchema> aggColumnSchemas,
+ int[] aggColumnsIndexArray,
List<DeviceEntry> deviceEntries,
- Ordering scanOrder,
SeriesScanOptions seriesScanOptions,
List<String> measurementColumnNames,
Set<String> allSensors,
List<IMeasurementSchema> measurementSchemas,
- int maxTsBlockLineNum,
- int measurementCount,
List<TableAggregator> tableAggregators,
List<ColumnSchema> groupingKeySchemas,
int[] groupingKeyIndex,
ITableTimeRangeIterator tableTimeRangeIterator,
boolean ascending,
- long maxReturnSize,
boolean canUseStatistics,
- List<Integer> aggArguments) {
-
- super(
- sourceId,
- context,
- null,
- measurementCount,
- null,
- null,
- ascending,
- false,
- null,
- maxReturnSize,
- (1L + measurementCount) *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
- canUseStatistics);
+ List<Integer> aggregatorInputChannels) {
+ this.sourceId = sourceId;
+ this.operatorContext = context;
+ this.canUseStatistics = canUseStatistics;
this.tableAggregators = tableAggregators;
this.groupingKeySchemas = groupingKeySchemas;
this.groupingKeyIndex = groupingKeyIndex;
-
- this.sourceId = sourceId;
- this.operatorContext = context;
- this.columnSchemas = columnSchemas;
- this.columnsIndexArray = columnsIndexArray;
+ this.aggColumnSchemas = aggColumnSchemas;
+ this.aggColumnsIndexArray = aggColumnsIndexArray;
this.deviceEntries = deviceEntries;
this.deviceCount = deviceEntries.size();
this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER,
Integer.toString(this.deviceCount));
- this.scanOrder = scanOrder;
+ this.ascending = ascending;
+ this.scanOrder = ascending ? Ordering.ASC : Ordering.DESC;
this.seriesScanOptions = seriesScanOptions;
this.measurementColumnNames = measurementColumnNames;
+ this.measurementCount = measurementColumnNames.size();
+ this.cachedRawDataSize =
+ (1L + this.measurementCount)
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
this.allSensors = allSensors;
this.measurementSchemas = measurementSchemas;
this.measurementColumnTSDataTypes =
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
this.currentDeviceIndex = 0;
this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING,
Integer.toString(0));
- this.aggArguments = aggArguments;
+ this.aggregatorInputChannels = aggregatorInputChannels;
this.timeIterator = tableTimeRangeIterator;
- if (tableTimeRangeIterator.getType()
- == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) {
- curTimeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
- }
-
- this.maxReturnSize = maxReturnSize;
- this.maxTsBlockLineNum = maxTsBlockLineNum;
constructAlignedSeriesScanUtil();
}
@@ -192,9 +168,23 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
finished = !hasNextWithTimer();
}
return finished;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return cachedRawDataSize + maxReturnSize;
+ }
- // return (retainedTsBlock == null)
- // && (currentDeviceIndex >= deviceCount ||
seriesScanOptions.limitConsumedUp());
+ @Override
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return timeIterator.getType() ==
ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR
+ ? cachedRawDataSize
+ : 0;
}
@Override
@@ -231,7 +221,6 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
// return true if current time window is calc finished
if (calculateAggregationResultForCurrentTimeRange()) {
timeIterator.resetCurTimeRange();
- // curTimeRange = null;
}
}
@@ -266,7 +255,7 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
return resultTsBlock;
}
- private void constructAlignedSeriesScanUtil() {
+ protected void constructAlignedSeriesScanUtil() {
DeviceEntry deviceEntry;
if (this.deviceEntries.isEmpty() ||
this.deviceEntries.get(this.currentDeviceIndex) == null) {
@@ -290,7 +279,6 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
}
/** Return true if we have the result of this timeRange. */
- @Override
protected boolean calculateAggregationResultForCurrentTimeRange() {
try {
if (calcFromCachedData()) {
@@ -353,14 +341,12 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
}
}
- @Override
protected void updateResultTsBlock() {
appendAggregationResult(resultTsBlockBuilder, tableAggregators);
// after appendAggregationResult invoked, aggregators must be cleared
resetTableAggregators();
}
- @Override
protected boolean calcFromCachedData() {
return calcUsingRawData(inputTsBlock);
}
@@ -420,13 +406,13 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
}
TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1);
- Column[] valueColumns = new Column[aggArguments.size()];
- for (int idx : aggArguments) {
+ Column[] valueColumns = new Column[aggregatorInputChannels.size()];
+ for (int idx : aggregatorInputChannels) {
if (valueColumns[idx] != null) {
continue;
}
valueColumns[idx] =
- buildValueColumn(columnSchemas.get(idx).getColumnCategory(),
inputRegion, idx);
+ buildValueColumn(aggColumnSchemas.get(idx).getColumnCategory(),
inputRegion, idx);
}
TsBlock tsBlock =
@@ -463,7 +449,7 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
(String)
deviceEntries
.get(currentDeviceIndex)
- .getNthSegment(columnsIndexArray[columnIdx] + 1);
+ .getNthSegment(aggColumnsIndexArray[columnIdx] + 1);
return getIdOrAttrColumn(
inputRegion.getTimeColumn().getPositionCount(),
id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET));
@@ -472,10 +458,10 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
deviceEntries
.get(currentDeviceIndex)
.getAttributeColumnValues()
- .get(columnsIndexArray[columnIdx]);
+ .get(aggColumnsIndexArray[columnIdx]);
return
getIdOrAttrColumn(inputRegion.getTimeColumn().getPositionCount(), attr);
case MEASUREMENT:
- return inputRegion.getColumn(columnsIndexArray[columnIdx]);
+ return inputRegion.getColumn(aggColumnsIndexArray[columnIdx]);
default:
throw new IllegalStateException("Unsupported column type: " +
columnSchemaCategory);
}
@@ -507,10 +493,13 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
idx++;
TsTableColumnCategory columnSchemaCategory =
- columnSchemas.get(aggArguments.get(idx)).getColumnCategory();
+
aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory();
statisticsArray[i] =
buildStatistics(
- columnSchemaCategory, timeStatistics, valueStatistics,
aggArguments.get(idx));
+ columnSchemaCategory,
+ timeStatistics,
+ valueStatistics,
+ aggregatorInputChannels.get(idx));
}
aggregator.processStatistics(statisticsArray);
@@ -531,7 +520,7 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
(String)
deviceEntries
.get(currentDeviceIndex)
- .getNthSegment(columnsIndexArray[columnIdx] + 1);
+ .getNthSegment(aggColumnsIndexArray[columnIdx] + 1);
return getStatistics(
timeStatistics, id == null ? null : new Binary(id,
TSFileConfig.STRING_CHARSET));
case ATTRIBUTE:
@@ -539,10 +528,10 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
deviceEntries
.get(currentDeviceIndex)
.getAttributeColumnValues()
- .get(columnsIndexArray[columnIdx]);
+ .get(aggColumnsIndexArray[columnIdx]);
return getStatistics(timeStatistics, attr);
case MEASUREMENT:
- return valueStatistics[columnsIndexArray[columnIdx]];
+ return valueStatistics[aggColumnsIndexArray[columnIdx]];
default:
throw new IllegalStateException("Unsupported column type: " +
columnSchemaCategory);
}
@@ -562,7 +551,6 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
}
@SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"})
- @Override
public boolean readAndCalcFromFile() throws IOException {
// start stopwatch
long start = System.nanoTime();
@@ -585,8 +573,8 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
if (timeIterator
.getCurTimeRange()
.contains(fileTimeStatistics.getStartTime(),
fileTimeStatistics.getEndTime())) {
- Statistics[] statisticsList = new Statistics[subSensorSize];
- for (int i = 0; i < subSensorSize; i++) {
+ Statistics[] statisticsList = new Statistics[measurementCount];
+ for (int i = 0; i < measurementCount; i++) {
statisticsList[i] = seriesScanUtil.currentFileStatistics(i);
}
calcFromStatistics(fileTimeStatistics, statisticsList);
@@ -632,8 +620,8 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
.getCurTimeRange()
.contains(chunkTimeStatistics.getStartTime(),
chunkTimeStatistics.getEndTime())) {
// calc from chunkMetaData
- Statistics[] statisticsList = new Statistics[subSensorSize];
- for (int i = 0; i < subSensorSize; i++) {
+ Statistics[] statisticsList = new Statistics[measurementCount];
+ for (int i = 0; i < measurementCount; i++) {
statisticsList[i] = seriesScanUtil.currentChunkStatistics(i);
}
calcFromStatistics(chunkTimeStatistics, statisticsList);
@@ -681,8 +669,8 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
if (timeIterator
.getCurTimeRange()
.contains(pageTimeStatistics.getStartTime(),
pageTimeStatistics.getEndTime())) {
- Statistics[] statisticsList = new Statistics[subSensorSize];
- for (int i = 0; i < subSensorSize; i++) {
+ Statistics[] statisticsList = new Statistics[measurementCount];
+ for (int i = 0; i < measurementCount; i++) {
statisticsList[i] = seriesScanUtil.currentPageStatistics(i);
}
calcFromStatistics(pageTimeStatistics, statisticsList);
@@ -866,7 +854,6 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
this.queryDataSource = (QueryDataSource) dataSource;
this.seriesScanUtil.initQueryDataSource(queryDataSource);
this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
- this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 7dcdbe04c95..3873973ca85 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -378,21 +378,18 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
}
}
- SeriesScanOptions.Builder scanOptionsBuilder =
- node.getTimePredicate().isPresent()
- ? getSeriesScanOptionsBuilder(context,
node.getTimePredicate().get())
- : new SeriesScanOptions.Builder();
- scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
- scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
-
scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice());
- scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames));
-
- Expression pushDownPredicate = node.getPushDownPredicate();
- if (pushDownPredicate != null) {
- scanOptionsBuilder.withPushDownFilter(
- convertPredicateToFilter(
- pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap,
timeColumnName));
- }
+ SeriesScanOptions seriesScanOptions =
+ buildSeriesScanOptions(
+ context,
+ columnSchemaMap,
+ measurementColumnNames,
+ measurementColumnsIndexMap,
+ timeColumnName,
+ node.getTimePredicate(),
+ node.getPushDownLimit(),
+ node.getPushDownOffset(),
+ node.isPushLimitToEachDevice(),
+ node.getPushDownPredicate());
OperatorContext operatorContext =
context
@@ -421,7 +418,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
columnsIndexArray,
node.getDeviceEntries(),
node.getScanOrder(),
- scanOptionsBuilder.build(),
+ seriesScanOptions,
measurementColumnNames,
allSensors,
measurementSchemas,
@@ -1748,74 +1745,71 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
public Operator visitAggregationTableScan(
AggregationTableScanNode node, LocalExecutionPlanContext context) {
+ List<String> measurementColumnNames = new ArrayList<>();
+ List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+ Map<String, Integer> measurementColumnsIndexMap = new HashMap<>();
+
List<TableAggregator> aggregators = new
ArrayList<>(node.getAggregations().size());
- Map<Symbol, Integer> columnLayout = new
HashMap<>(node.getAggregations().size());
+ List<Integer> aggregatorInputChannels =
+ new ArrayList<>(
+ (int)
+ node.getAggregations().values().stream()
+ .mapToLong(aggregation ->
aggregation.getArguments().size())
+ .sum());
+ int aggDistinctArgumentCount =
+ (int)
+ node.getAggregations().values().stream()
+ .flatMap(aggregation -> aggregation.getArguments().stream())
+ .map(Symbol::from)
+ .distinct()
+ .count();
+ List<ColumnSchema> aggColumnSchemas = new
ArrayList<>(aggDistinctArgumentCount);
+ Map<Symbol, Integer> aggColumnLayout = new
HashMap<>(aggDistinctArgumentCount);
+ int[] aggColumnsIndexArray = new int[aggDistinctArgumentCount];
- int distinctArgumentCount = node.getAssignments().size();
- int aggregationsCount = node.getAggregations().size();
- List<Integer> aggColumnIndexes = new ArrayList<>();
+ String timeColumnName = null;
int channel = 0;
- int idx = -1;
int measurementColumnCount = 0;
- Map<Symbol, Integer> idAndAttributeColumnsIndexMap =
node.getIdAndAttributeIndexMap();
- Map<Symbol, ColumnSchema> columnSchemaMap = node.getAssignments();
- List<ColumnSchema> columnSchemas = new ArrayList<>(aggregationsCount);
- int[] columnsIndexArray = new int[distinctArgumentCount];
- List<String> measurementColumnNames = new ArrayList<>();
- Map<String, Integer> measurementColumnsIndexMap = new HashMap<>();
- String timeColumnName = null;
- List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
-
for (Map.Entry<Symbol, AggregationNode.Aggregation> entry :
node.getAggregations().entrySet()) {
- AggregationNode.Aggregation aggregation = entry.getValue();
-
- for (Expression argument : aggregation.getArguments()) {
- idx++;
+ for (Expression argument : entry.getValue().getArguments()) {
Symbol symbol = Symbol.from(argument);
- ColumnSchema schema = requireNonNull(columnSchemaMap.get(symbol),
symbol + " is null");
- switch (schema.getColumnCategory()) {
- case ID:
- case ATTRIBUTE:
- if (!columnLayout.containsKey(symbol)) {
- columnsIndexArray[channel] =
- requireNonNull(idAndAttributeColumnsIndexMap.get(symbol),
symbol + " is null");
- columnSchemas.add(schema);
- }
- break;
- case MEASUREMENT:
- if (!columnLayout.containsKey(symbol)) {
- columnsIndexArray[channel] = measurementColumnCount;
+ ColumnSchema schema =
+ requireNonNull(node.getAssignments().get(symbol), symbol + " is
null");
+ if (!aggColumnLayout.containsKey(symbol)) {
+ switch (schema.getColumnCategory()) {
+ case ID:
+ case ATTRIBUTE:
+ aggColumnsIndexArray[channel] =
+ requireNonNull(node.getIdAndAttributeIndexMap().get(symbol),
symbol + " is null");
+ break;
+ case MEASUREMENT:
+ aggColumnsIndexArray[channel] = measurementColumnCount;
measurementColumnCount++;
measurementColumnNames.add(schema.getName());
measurementSchemas.add(
new MeasurementSchema(schema.getName(),
getTSDataType(schema.getType())));
- columnSchemas.add(schema);
measurementColumnsIndexMap.put(symbol.getName(),
measurementColumnCount - 1);
- }
- break;
- case TIME:
- if (!columnLayout.containsKey(symbol)) {
- columnsIndexArray[channel] = -1;
- columnSchemas.add(schema);
+ break;
+ case TIME:
+ aggColumnsIndexArray[channel] = -1;
timeColumnName = symbol.getName();
- }
- break;
- default:
- throw new IllegalArgumentException(
- "Unexpected column category: " + schema.getColumnCategory());
- }
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unexpected column category: " + schema.getColumnCategory());
+ }
- if (!columnLayout.containsKey(symbol)) {
- aggColumnIndexes.add(channel);
- columnLayout.put(symbol, channel++);
+ aggColumnSchemas.add(schema);
+ aggregatorInputChannels.add(channel);
+ aggColumnLayout.put(symbol, channel++);
} else {
- aggColumnIndexes.add(columnLayout.get(symbol));
+ aggregatorInputChannels.add(aggColumnLayout.get(symbol));
}
}
}
for (Map.Entry<Symbol, ColumnSchema> entry :
node.getAssignments().entrySet()) {
- if (!columnLayout.containsKey(entry.getKey())
+ if (!aggColumnLayout.containsKey(entry.getKey())
&& entry.getValue().getColumnCategory() == MEASUREMENT) {
measurementColumnCount++;
measurementColumnNames.add(entry.getValue().getName());
@@ -1835,7 +1829,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
for (Map.Entry<Symbol, AggregationNode.Aggregation> entry :
node.getAggregations().entrySet()) {
aggregators.add(
buildAggregator(
- columnLayout,
+ aggColumnLayout,
entry.getKey(),
entry.getValue(),
node.getStep(),
@@ -1853,9 +1847,9 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
for (int i = 0; i < node.getGroupingKeys().size(); i++) {
Symbol groupingKey = node.getGroupingKeys().get(i);
- if (idAndAttributeColumnsIndexMap.containsKey(groupingKey)) {
- groupingKeySchemas.add(columnSchemaMap.get(groupingKey));
- groupingKeyIndex[i] = idAndAttributeColumnsIndexMap.get(groupingKey);
+ if (node.getIdAndAttributeIndexMap().containsKey(groupingKey)) {
+ groupingKeySchemas.add(node.getAssignments().get(groupingKey));
+ groupingKeyIndex[i] =
node.getIdAndAttributeIndexMap().get(groupingKey);
} else {
if (node.getProjection() != null
&& !node.getProjection().getMap().isEmpty()
@@ -1896,62 +1890,81 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
context.getNextOperatorId(),
node.getPlanNodeId(),
TableAggregationTableScanOperator.class.getSimpleName());
- SeriesScanOptions.Builder scanOptionsBuilder =
- node.getTimePredicate().isPresent()
- ? getSeriesScanOptionsBuilder(context,
node.getTimePredicate().get())
- : new SeriesScanOptions.Builder();
- scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
- scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
-
scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice());
- scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames));
- Expression pushDownPredicate = node.getPushDownPredicate();
- if (pushDownPredicate != null) {
- scanOptionsBuilder.withPushDownFilter(
- convertPredicateToFilter(
- pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap,
timeColumnName));
- }
+ SeriesScanOptions seriesScanOptions =
+ buildSeriesScanOptions(
+ context,
+ node.getAssignments(),
+ measurementColumnNames,
+ measurementColumnsIndexMap,
+ timeColumnName,
+ node.getTimePredicate(),
+ node.getPushDownLimit(),
+ node.getPushDownOffset(),
+ node.isPushLimitToEachDevice(),
+ node.getPushDownPredicate());
Set<String> allSensors = new HashSet<>(measurementColumnNames);
- // for time column
- allSensors.add("");
+ allSensors.add(""); // for time column
+
+ for (int i = 0; i < node.getDeviceEntries().size(); i++) {
+ AlignedFullPath alignedPath =
+ constructAlignedPath(
+ node.getDeviceEntries().get(i),
+ measurementColumnNames,
+ measurementSchemas,
+ allSensors);
+ ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
+ }
+
+ context.getDriverContext().setInputDriver(true);
+
TableAggregationTableScanOperator aggTableScanOperator =
new TableAggregationTableScanOperator(
node.getPlanNodeId(),
operatorContext,
- columnSchemas,
- columnsIndexArray,
+ aggColumnSchemas,
+ aggColumnsIndexArray,
node.getDeviceEntries(),
- scanAscending ? Ordering.ASC : Ordering.DESC,
- scanOptionsBuilder.build(),
+ seriesScanOptions,
measurementColumnNames,
allSensors,
measurementSchemas,
-
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
- measurementColumnCount,
aggregators,
groupingKeySchemas,
groupingKeyIndex,
timeRangeIterator,
scanAscending,
- calculateMaxAggregationResultSize(),
canUseStatistic,
- aggColumnIndexes);
-
+ aggregatorInputChannels);
((DataDriverContext)
context.getDriverContext()).addSourceOperator(aggTableScanOperator);
+ return aggTableScanOperator;
+ }
- for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) {
- AlignedFullPath alignedPath =
- constructAlignedPath(
- node.getDeviceEntries().get(i),
- measurementColumnNames,
- measurementSchemas,
- allSensors);
- ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
+ private SeriesScanOptions buildSeriesScanOptions(
+ LocalExecutionPlanContext context,
+ Map<Symbol, ColumnSchema> columnSchemaMap,
+ List<String> measurementColumnNames,
+ Map<String, Integer> measurementColumnsIndexMap,
+ String timeColumnName,
+ Optional<Expression> timePredicate,
+ long pushDownLimit,
+ long pushDownOffset,
+ boolean pushLimitToEachDevice,
+ Expression pushDownPredicate) {
+ SeriesScanOptions.Builder scanOptionsBuilder =
+ timePredicate
+ .map(expression -> getSeriesScanOptionsBuilder(context,
expression))
+ .orElseGet(SeriesScanOptions.Builder::new);
+ scanOptionsBuilder.withPushDownLimit(pushDownLimit);
+ scanOptionsBuilder.withPushDownOffset(pushDownOffset);
+ scanOptionsBuilder.withPushLimitToEachDevice(pushLimitToEachDevice);
+ scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames));
+ if (pushDownPredicate != null) {
+ scanOptionsBuilder.withPushDownFilter(
+ convertPredicateToFilter(
+ pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap,
timeColumnName));
}
-
- context.getDriverContext().setInputDriver(true);
-
- return aggTableScanOperator;
+ return scanOptionsBuilder.build();
}
@Override
@@ -2035,30 +2048,4 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
}
return new boolean[] {canUseStatistic, isAscending};
}
-
- public static long calculateMaxAggregationResultSize(
- // List<? extends AggregationDescriptor> aggregationDescriptors,
- // ITimeRangeIterator timeRangeIterator
- ) {
- // TODO perfect max aggregation result size logic
- return
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
-
- // long timeValueColumnsSizePerLine =
TimeColumn.SIZE_IN_BYTES_PER_POSITION;
- // for (AggregationDescriptor descriptor : aggregationDescriptors) {
- // List<TSDataType> outPutDataTypes =
- // descriptor.getOutputColumnNames().stream()
- // .map(typeProvider::getTableModelType)
- // .collect(Collectors.toList());
- // for (TSDataType tsDataType : outPutDataTypes) {
- // timeValueColumnsSizePerLine +=
getOutputColumnSizePerLine(tsDataType);
- // }
- // }
- //
- // return Math.min(
- //
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
- // Math.min(
- //
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
- // timeRangeIterator.getTotalIntervalNum())
- // * timeValueColumnsSizePerLine);
- }
}