gianm commented on code in PR #19061:
URL: https://github.com/apache/druid/pull/19061#discussion_r2937226380
##########
processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java:
##########
@@ -89,10 +94,25 @@ public <T> Collection<T> prune(Iterable<T> input,
Function<T, DataSegment> conve
Map<String, RangeSet<String>> filterDomain = new HashMap<>();
List<String> dimensions = shard.getDomainDimensions();
for (String dimension : dimensions) {
- if (filterFields == null || filterFields.contains(dimension)) {
+ final VirtualColumn shardVirtualColumn =
shard.getDomainVirtualColumns().getVirtualColumn(dimension);
Review Comment:
Consider adding test cases that verify pruning still works if the query-time
virtual column doesn't have the same name as the one in the shard spec. Maybe
it's there but I didn't see one.
##########
processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java:
##########
@@ -127,6 +137,12 @@ public List<String> getDomainDimensions()
return Collections.unmodifiableList(dimensions);
}
+ @Override
+ public VirtualColumns getDomainVirtualColumns()
Review Comment:
Javadoc please. I don't think it will be immediately obvious what this means.
##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessor.java:
##########
@@ -80,17 +83,20 @@ public class SegmentGeneratorStageProcessor implements
StageProcessor<Set<DataSe
{
private final DataSchema dataSchema;
private final ColumnMappings columnMappings;
+ private final Map<String, VirtualColumn> clusterByVirtualColumnMappings;
private final MSQTuningConfig tuningConfig;
@JsonCreator
public SegmentGeneratorStageProcessor(
@JsonProperty("dataSchema") final DataSchema dataSchema,
@JsonProperty("columnMappings") final ColumnMappings columnMappings,
+ @JsonProperty("clusterByVirtualColumnsMappings") @Nullable final
Map<String, VirtualColumn> clusterByVirtualColumnMappings,
Review Comment:
Does not match `getClusterByVirtualColumnMappings()` (`Columns` vs
`Column`). Please add a serde test.
##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -508,31 +548,47 @@ private Query<?> buildScanQuery(
Map<String, VirtualColumn> inputColToVirtualCol
)
{
- RowSignature rowSignature = getRowSignature(dataSchema);
- VirtualColumns virtualColumns = VirtualColumns.create(new
ArrayList<>(inputColToVirtualCol.values()));
+ RowSignature baseRowSignature = getRowSignature(dataSchema);
+ final List<String> columns = new
ArrayList<>(baseRowSignature.getColumnNames());
+ final List<OrderBy> orderBys;
+
+ RowSignature.Builder rowSignatureWithOrderByBuilder =
RowSignature.builder().addAll(baseRowSignature);
+
+ // when clustering by a virtual column, we might need to add the virtual
column to columns list and row signature
+ if (compactionTask.getTuningConfig() != null &&
compactionTask.getTuningConfig().getPartitionsSpec() != null) {
+ List<OrderByColumnSpec> orderByColumnSpecs =
getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec());
+ orderBys = new ArrayList<>();
+ for (OrderByColumnSpec spec : orderByColumnSpecs) {
+ orderBys.add(new OrderBy(spec.getDimension(),
Order.fromString(spec.getDirection().toString())));
+
+ final VirtualColumn vc = inputColToVirtualCol.get(spec.getDimension());
+ if (vc != null) {
+ columns.add(spec.getDimension());
+ final ColumnCapabilities capabilities =
vc.capabilities(baseRowSignature, vc.getOutputName());
+ DruidException.conditionalDefensive(
+ capabilities != null,
+ "virtual column[%s] has null capabilities, cannot determine
output type",
+ vc.getOutputName()
+ );
+ rowSignatureWithOrderByBuilder.add(spec.getDimension(),
capabilities.toColumnType());
+ }
+ }
+ } else {
+ orderBys = null;
+ }
+
Druids.ScanQueryBuilder scanQueryBuilder = new Druids.ScanQueryBuilder()
.dataSource(getInputDataSource(dataSchema.getDataSource()))
- .columns(rowSignature.getColumnNames())
- .virtualColumns(virtualColumns)
- .columnTypes(rowSignature.getColumnTypes())
+ .columns(columns)
+ .virtualColumns(VirtualColumns.create(inputColToVirtualCol.values()))
+ .columnTypes(rowSignatureWithOrderByBuilder.build().getColumnTypes())
.intervals(segmentSpec)
.filters(dataSchema.getTransformSpec().getFilter())
+ .virtualColumns(VirtualColumns.create(inputColToVirtualCol.values()))
+ .columns(columns)
+ .columnTypes(rowSignatureWithOrderByBuilder.build().getColumnTypes())
Review Comment:
`columnTypes`, `columns`, and `virtualColumns` appear twice in this list
##########
processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java:
##########
@@ -203,6 +203,10 @@ public Function<Sequence<ResultRow>, Sequence<ResultRow>>
build(final GroupByQue
sortingNeeded = true;
break;
}
+ if
(query.getVirtualColumns().getVirtualColumn(columnSpec.getDimension()) != null)
{
Review Comment:
This doesn't seem right. The `OrderByColumnSpec` refers to dimension and
aggregator output names. Virtual columns would potentially contain the names of
input fields to dimensions and aggregators, but wouldn't contain the output
names. What was the check needed for?
##########
multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessor.java:
##########
@@ -80,17 +83,20 @@ public class SegmentGeneratorStageProcessor implements
StageProcessor<Set<DataSe
{
private final DataSchema dataSchema;
private final ColumnMappings columnMappings;
+ private final Map<String, VirtualColumn> clusterByVirtualColumnMappings;
Review Comment:
This new field is missing from `equals` and `hashCode`. Please add an
`EqualsVerifier` test.
##########
processing/src/main/java/org/apache/druid/frame/key/ClusterBy.java:
##########
@@ -63,17 +63,14 @@ public ClusterBy(
// Key must be 100% sortable or 100% nonsortable. If empty, call it
sortable.
boolean sortable = true;
-
Review Comment:
The changes in this file have become formatting-only, how about reverting it
to match `master`?
##########
multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1811,27 +1822,41 @@ private static Function<Set<DataSegment>,
Set<DataSegment>> addCompactionStateTo
partitionSpec = new
DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
}
- Granularity segmentGranularity = ((DataSourceMSQDestination)
querySpec.getDestination())
- .getSegmentGranularity();
+ Granularity segmentGranularity = destination.getSegmentGranularity();
GranularitySpec granularitySpec = new UniformGranularitySpec(
segmentGranularity,
- querySpec.getContext()
-
.getGranularity(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY, jsonMapper),
+
querySpec.getContext().getGranularity(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY,
jsonMapper),
dataSchema.getGranularitySpec().isRollup(),
// Not using dataSchema.getGranularitySpec().inputIntervals() as that
always has ETERNITY
- ((DataSourceMSQDestination)
querySpec.getDestination()).getReplaceTimeChunks()
+ destination.getReplaceTimeChunks()
);
DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec();
- CompactionTransformSpec transformSpec =
TransformSpec.NONE.equals(dataSchema.getTransformSpec())
- ? null
- :
CompactionTransformSpec.of(dataSchema.getTransformSpec());
+
+ // if the clustered by requires virtual columns, preserve them here so
that we can rebuild during compaction
+ CompactionTransformSpec transformSpec;
+ // this is true if we are in here
Review Comment:
what is true if we are in here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]