cryptoe commented on code in PR #15470: URL: https://github.com/apache/druid/pull/15470#discussion_r1529890568
########## processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.concrete; + +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.read.columnar.FrameColumnReaders; +import org.apache.druid.frame.segment.FrameStorageAdapter; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.semantic.WireTransferable; +import org.apache.druid.segment.CloseableShapeshifter; +import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.LinkedHashMap; + +public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter +{ + private final Frame frame; + private final RowSignature signature; + private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>(); + + public ColumnBasedFrameRowsAndColumns(Frame frame, RowSignature signature) + { + this.frame = FrameType.COLUMNAR.ensureType(frame); + this.signature = signature; + } + + @Override + public Collection<String> getColumnNames() + { + return signature.getColumnNames(); + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @Nullable + @Override + public Column findColumn(String name) + { + // Use contains so that we can negative cache. + if (!colCache.containsKey(name)) { + final int columnIndex = signature.indexOf(name); + if (columnIndex < 0) { + colCache.put(name, null); + } else { + final ColumnType columnType = signature + .getColumnType(columnIndex) + .orElseThrow(() -> new ISE("just got the id, why is columnType not there?")); + + colCache.put(name, FrameColumnReaders.create(name, columnIndex, columnType).readRACColumn(frame)); + } + } + return colCache.get(name); + + } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public <T> T as(Class<T> clazz) + { + if (StorageAdapter.class.equals(clazz)) { + return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY); + } + if (WireTransferable.class.equals(clazz)) { Review Comment: Why would this be wire transferable. I am asking since I am unfamiliar with this space. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.KeyColumn; +import org.apache.druid.frame.key.KeyOrder; +import org.apache.druid.msq.input.stage.StageInputSpec; +import org.apache.druid.msq.kernel.HashShuffleSpec; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.QueryDefinitionBuilder; +import org.apache.druid.msq.kernel.ShuffleSpec; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Query; +import org.apache.druid.query.operator.ColumnWithDirection; +import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; +import org.apache.druid.query.operator.NaiveSortOperatorFactory; +import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.query.operator.window.WindowOperatorFactory; +import org.apache.druid.segment.column.RowSignature; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery> +{ + private final ObjectMapper jsonMapper; + + public WindowOperatorQueryKit(ObjectMapper jsonMapper) + { + this.jsonMapper = jsonMapper; + } + + @Override + public QueryDefinition makeQueryDefinition( + String queryId, + WindowOperatorQuery originalQuery, + QueryKit<Query<?>> queryKit, + ShuffleSpecFactory resultShuffleSpecFactory, + int maxWorkerCount, + int minStageNumber + ) + { + // need to validate query first + // populate the group of operators to be processed as each stage + // the size of the operators is the number of serialized stages + // later we should also check if these can be parallelized + // check there is an empty over clause or not + List<List<OperatorFactory>> operatorList = new ArrayList<>(); + boolean status = validateAndReturnOperatorList(originalQuery, operatorList); Review Comment: Nit: This status is cryptic. Could you add some java docs to the method. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java: ########## @@ -140,6 +141,20 @@ public QueryDefinition makeQueryDefinition( // Add partition boosting column. clusterByColumns.add(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING)); signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG); + final RowSignature signatureSoFar = signatureBuilder.build(); + boolean addShuffle = true; + if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) { + final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); + for (KeyColumn c : windowClusterBy.getColumns()) { + if (!signatureSoFar.contains(c.columnName())) { + addShuffle = false; + break; + } + } + if (addShuffle) { + clusterByColumns.addAll(windowClusterBy.getColumns()); Review Comment: Here you are partition boosting and adding clustering columns. Won't stuff break ? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java: ########## @@ -164,34 +168,88 @@ public QueryDefinition makeQueryDefinition( partitionBoost ); - queryDefBuilder.add( - StageDefinition.builder(firstStageNumber + 1) - .inputs(new StageInputSpec(firstStageNumber)) - .signature(resultSignature) - .maxWorkerCount(maxWorkerCount) - .shuffleSpec( - shuffleSpecFactoryPostAggregation != null - ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) - : null - ) - .processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun)) - ); + // the result signature might change + // if window shufle spec is added + // say the output signature was d0, d1 + // But shuffle spec for window was d1 + // create the shufflespec from the column in the context + // and sort after wards to ensure prefix of shuffle is in row signature + final ShuffleSpec nextShuffleWindowSpec; + if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) { + final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); + nextShuffleWindowSpec = new HashShuffleSpec( + windowClusterBy, + maxWorkerCount + ); + } else { + nextShuffleWindowSpec = null; + } + final ShuffleSpec stageShuffleSpec; + if (shuffleSpecFactoryPostAggregation != null) { + List<KeyColumn> columns = resultClusterBy.getColumns(); + if (nextShuffleWindowSpec != null) { + columns.addAll(nextShuffleWindowSpec.clusterBy().getColumns()); Review Comment: This can have a boosted col. Won't windowing break because of that. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java: ########## @@ -164,34 +168,88 @@ public QueryDefinition makeQueryDefinition( partitionBoost ); - queryDefBuilder.add( - StageDefinition.builder(firstStageNumber + 1) - .inputs(new StageInputSpec(firstStageNumber)) - .signature(resultSignature) - .maxWorkerCount(maxWorkerCount) - .shuffleSpec( - shuffleSpecFactoryPostAggregation != null - ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) - : null - ) - .processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun)) - ); + // the result signature might change + // if window shufle spec is added + // say the output signature was d0, d1 + // But shuffle spec for window was d1 + // create the shufflespec from the column in the context + // and sort after wards to ensure prefix of shuffle is in row signature + final ShuffleSpec nextShuffleWindowSpec; + if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) { + final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); + nextShuffleWindowSpec = new HashShuffleSpec( + windowClusterBy, + maxWorkerCount + ); + } else { + nextShuffleWindowSpec = null; + } + final ShuffleSpec stageShuffleSpec; Review Comment: This shuffle spec needs a better name. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.KeyColumn; +import org.apache.druid.frame.key.KeyOrder; +import org.apache.druid.msq.input.stage.StageInputSpec; +import org.apache.druid.msq.kernel.HashShuffleSpec; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.QueryDefinitionBuilder; +import org.apache.druid.msq.kernel.ShuffleSpec; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Query; +import org.apache.druid.query.operator.ColumnWithDirection; +import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; +import org.apache.druid.query.operator.NaiveSortOperatorFactory; +import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.query.operator.window.WindowOperatorFactory; +import org.apache.druid.segment.column.RowSignature; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery> +{ + private final ObjectMapper jsonMapper; + + public WindowOperatorQueryKit(ObjectMapper jsonMapper) + { + this.jsonMapper = jsonMapper; + } + + @Override + public QueryDefinition makeQueryDefinition( + String queryId, + WindowOperatorQuery originalQuery, + QueryKit<Query<?>> queryKit, + ShuffleSpecFactory resultShuffleSpecFactory, + int maxWorkerCount, + int minStageNumber + ) + { + // need to validate query first + // populate the group of operators to be processed as each stage + // the size of the operators is the number of serialized stages + // later we should also check if these can be parallelized + // check there is an empty over clause or not + List<List<OperatorFactory>> operatorList = new ArrayList<>(); + boolean status = validateAndReturnOperatorList(originalQuery, operatorList); + + + ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount); + // add this shuffle spec to the last stage of the inner query + + final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder().queryId(queryId); + if (nextShuffleSpec != null) { + final ClusterBy windowClusterBy = nextShuffleSpec.clusterBy(); + originalQuery = (WindowOperatorQuery) originalQuery.withOverriddenContext(ImmutableMap.of( + MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL, + windowClusterBy + )); + } + final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( + queryKit, + queryId, + originalQuery.context(), + originalQuery.getDataSource(), + originalQuery.getQuerySegmentSpec(), + originalQuery.getFilter(), + null, + maxWorkerCount, + minStageNumber, + false + ); + + dataSourcePlan.getSubQueryDefBuilder().ifPresent(queryDefBuilder::addAll); + + final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); + final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource()); + RowSignature rowSignature = queryToRun.getRowSignature(); + + + if (status) { + // empty over clause found + // moving everything to a single partition + queryDefBuilder.add( + StageDefinition.builder(firstStageNumber) + .inputs(new StageInputSpec(firstStageNumber - 1)) + .signature(rowSignature) + .maxWorkerCount(maxWorkerCount) + .shuffleSpec(ShuffleSpecFactories.singlePartition() Review Comment: This should be null rite. Since why would you require a single partition as a follow up stage to empty over ? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java: ########## @@ -166,34 +170,83 @@ public QueryDefinition makeQueryDefinition( partitionBoost ); - queryDefBuilder.add( - StageDefinition.builder(firstStageNumber + 1) - .inputs(new StageInputSpec(firstStageNumber)) - .signature(resultSignature) - .maxWorkerCount(maxWorkerCount) - .shuffleSpec( - shuffleSpecFactoryPostAggregation != null - ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) - : null - ) - .processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun)) - ); + // the result signature might change + // if window shufle spec is added + // say the output signature was d0, d1 + // But shuffle spec for window was d1 + // create the shufflespec from the column in the context + // and sort after wards to ensure prefix of shuffle is in row signature + final ShuffleSpec nextShuffleWindowSpec; Review Comment: I mean the method `computeResultClusterBy` should have the windowing logic pushed in also. Currently it gives you something and then you mutate it based on the windowing requirement. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java: ########## @@ -164,34 +168,88 @@ public QueryDefinition makeQueryDefinition( partitionBoost ); - queryDefBuilder.add( - StageDefinition.builder(firstStageNumber + 1) - .inputs(new StageInputSpec(firstStageNumber)) - .signature(resultSignature) - .maxWorkerCount(maxWorkerCount) - .shuffleSpec( - shuffleSpecFactoryPostAggregation != null - ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) - : null - ) - .processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun)) - ); + // the result signature might change + // if window shufle spec is added + // say the output signature was d0, d1 + // But shuffle spec for window was d1 + // create the shufflespec from the column in the context + // and sort after wards to ensure prefix of shuffle is in row signature + final ShuffleSpec nextShuffleWindowSpec; + if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) { + final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); + nextShuffleWindowSpec = new HashShuffleSpec( + windowClusterBy, + maxWorkerCount + ); + } else { + nextShuffleWindowSpec = null; + } + final ShuffleSpec stageShuffleSpec; + if (shuffleSpecFactoryPostAggregation != null) { + List<KeyColumn> columns = resultClusterBy.getColumns(); + if (nextShuffleWindowSpec != null) { + columns.addAll(nextShuffleWindowSpec.clusterBy().getColumns()); + // Creating a new cluster by with the columns from existing + // plus the columns from the next window partition column + final ClusterBy tmp = new ClusterBy(columns, resultClusterBy.getBucketByCount()); + stageShuffleSpec = shuffleSpecFactoryPostAggregation.build(tmp, false); + } else { + stageShuffleSpec = shuffleSpecFactoryPostAggregation.build(resultClusterBy, false); + } + } else { + stageShuffleSpec = nextShuffleWindowSpec; + } + final RowSignature stageSignature; + if (stageShuffleSpec == null) { + stageSignature = resultSignature; + } else { + // sort the signature to make sure the prefix is aligned + stageSignature = QueryKitUtils.sortableSignature( + resultSignature, + stageShuffleSpec.clusterBy().getColumns() + ); + } if (doLimitOrOffset) { + queryDefBuilder.add( + StageDefinition.builder(firstStageNumber + 1) + .inputs(new StageInputSpec(firstStageNumber)) + .signature(resultSignature) + .maxWorkerCount(maxWorkerCount) + .shuffleSpec( + shuffleSpecFactoryPostAggregation != null + ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) + : null + ) + .processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun)) + ); final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec(); queryDefBuilder.add( StageDefinition.builder(firstStageNumber + 2) .inputs(new StageInputSpec(firstStageNumber + 1)) .signature(resultSignature) .maxWorkerCount(1) - .shuffleSpec(null) // no shuffling should be required after a limit processor. + // no shuffling should be required after a limit processor. + // but need one if the next stage is a window with a partition by + .shuffleSpec(nextShuffleWindowSpec) Review Comment: We are not sure if windowing is there. Currently the code is a little obtuse and assuming that if window shuffle spec is null, no windowing is there. I feel for readability we should make the nextShuffleWindowSpec optional and then do a get here. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.OutputChannel; +import org.apache.druid.frame.processor.OutputChannelFactory; +import org.apache.druid.frame.processor.OutputChannels; +import org.apache.druid.frame.processor.manager.ProcessorManagers; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.msq.counters.CounterTracker; +import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.msq.input.InputSliceReader; +import org.apache.druid.msq.input.ReadableInput; +import org.apache.druid.msq.input.stage.ReadablePartition; +import org.apache.druid.msq.input.stage.StageInputSlice; +import org.apache.druid.msq.kernel.FrameContext; +import org.apache.druid.msq.kernel.ProcessorsAndChannels; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; + +@JsonTypeName("window") +public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessorFactory +{ + private final WindowOperatorQuery query; + private final List<OperatorFactory> operatorList; + private final RowSignature stageRowSignature; + private final boolean isEmptyOver; + + @JsonCreator + public WindowOperatorQueryFrameProcessorFactory( + @JsonProperty("query") WindowOperatorQuery query, + @JsonProperty("operatorList") List<OperatorFactory> operatorFactoryList, + @JsonProperty("stageRowSignature") RowSignature stageRowSignature, + @JsonProperty("emptyOver") boolean emptyOver + ) + { + this.query = Preconditions.checkNotNull(query, "query"); + this.operatorList = Preconditions.checkNotNull(operatorFactoryList, "bad operator"); + this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature"); + this.isEmptyOver = emptyOver; + } + + @JsonProperty("query") + public WindowOperatorQuery getQuery() + { + return query; + } + + @JsonProperty("operatorList") + public List<OperatorFactory> getOperators() + { + return operatorList; + } + + @JsonProperty("stageRowSignature") + public RowSignature getSignature() + { + return stageRowSignature; + } + + @JsonProperty("emptyOver") + public boolean isEmptyOverFound() + { + return isEmptyOver; + } + + @Override + public ProcessorsAndChannels<Object, Long> makeProcessors( + StageDefinition stageDefinition, + int workerNumber, + List<InputSlice> inputSlices, + InputSliceReader inputSliceReader, + @Nullable Object extra, + OutputChannelFactory outputChannelFactory, + FrameContext frameContext, + int maxOutstandingProcessors, + CounterTracker counters, + Consumer<Throwable> warningPublisher + ) + { + // Expecting a single input slice from some prior stage. + final StageInputSlice slice = (StageInputSlice) Iterables.getOnlyElement(inputSlices); + final Int2ObjectSortedMap<OutputChannel> outputChannels = new Int2ObjectAVLTreeMap<>(); + + for (final ReadablePartition partition : slice.getPartitions()) { + outputChannels.computeIfAbsent( + partition.getPartitionNumber(), + i -> { + try { + return outputChannelFactory.openChannel(i); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + ); + } + + final Sequence<ReadableInput> readableInputs = + Sequences.simple(inputSliceReader.attach(0, slice, counters, warningPublisher)); + + final Sequence<FrameProcessor<Object>> processors = readableInputs.map( + readableInput -> { + final OutputChannel outputChannel = + outputChannels.get(readableInput.getStagePartition().getPartitionNumber()); + + return new WindowOperatorQueryFrameProcessor( + query, + readableInput.getChannel(), + outputChannel.getWritableChannel(), + stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator()), + readableInput.getChannelFrameReader(), + frameContext.jsonMapper(), + operatorList, + stageRowSignature, + isEmptyOver + ); + } + ); + + return new ProcessorsAndChannels<>( + ProcessorManagers.of(processors), + OutputChannels.wrapReadOnly(ImmutableList.copyOf(outputChannels.values())) + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WindowOperatorQueryFrameProcessorFactory that = (WindowOperatorQueryFrameProcessorFactory) o; + return Objects.equals(query, that.query) + && Objects.equals(operatorList, that.operatorList) + && Objects.equals(stageRowSignature, that.stageRowSignature); Review Comment: empty over is not there in both equals and hashcode. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java: ########## @@ -164,34 +168,88 @@ public QueryDefinition makeQueryDefinition( partitionBoost ); - queryDefBuilder.add( - StageDefinition.builder(firstStageNumber + 1) - .inputs(new StageInputSpec(firstStageNumber)) - .signature(resultSignature) - .maxWorkerCount(maxWorkerCount) - .shuffleSpec( - shuffleSpecFactoryPostAggregation != null - ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) - : null - ) - .processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun)) - ); + // the result signature might change + // if window shufle spec is added + // say the output signature was d0, d1 + // But shuffle spec for window was d1 + // create the shufflespec from the column in the context + // and sort after wards to ensure prefix of shuffle is in row signature + final ShuffleSpec nextShuffleWindowSpec; + if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) { + final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); + nextShuffleWindowSpec = new HashShuffleSpec( + windowClusterBy, + maxWorkerCount + ); + } else { + nextShuffleWindowSpec = null; Review Comment: If nextShuffleSpec is null then we can just revert to original behaviour and skip all this new code. Group by query kit is very core to MSQ. I feel doing code changes in a way which minimizes regressions is very important. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java: ########## @@ -198,6 +198,9 @@ private static ScanQuery prepareScanQueryForDataServer(@NotNull ScanQuery scanQu @Override protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(final DataServerQueryHandler dataServerQueryHandler) throws IOException { + // Try to run with the segment on the server + // if segment was not found for some reason Review Comment: This comments are not accurate. I would suggest removing them. ########## processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java: ########## @@ -80,6 +84,35 @@ public static MapOfColumnsRowsAndColumns fromMap(Map<String, ? extends Column> m ); } + public static MapOfColumnsRowsAndColumns fromResultRow(ArrayList<ResultRow> objs, RowSignature signature) Review Comment: Lets keep this comment open as it would be helpful for follow on patches. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java: ########## @@ -198,6 +198,9 @@ private static ScanQuery prepareScanQueryForDataServer(@NotNull ScanQuery scanQu @Override protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(final DataServerQueryHandler dataServerQueryHandler) throws IOException { + // Try to run with the segment on the server + // if segment was not found for some reason Review Comment: If segment is handed off, only then fetch from deep storage is the correct flow. ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.KeyColumn; +import org.apache.druid.frame.key.KeyOrder; +import org.apache.druid.msq.input.stage.StageInputSpec; +import org.apache.druid.msq.kernel.HashShuffleSpec; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.QueryDefinitionBuilder; +import org.apache.druid.msq.kernel.ShuffleSpec; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Query; +import org.apache.druid.query.operator.ColumnWithDirection; +import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; +import org.apache.druid.query.operator.NaiveSortOperatorFactory; +import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.query.operator.window.WindowOperatorFactory; +import org.apache.druid.segment.column.RowSignature; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery> +{ + private final ObjectMapper jsonMapper; + + public WindowOperatorQueryKit(ObjectMapper jsonMapper) + { + this.jsonMapper = jsonMapper; + } + + @Override + public QueryDefinition makeQueryDefinition( + String queryId, + WindowOperatorQuery originalQuery, + QueryKit<Query<?>> queryKit, + ShuffleSpecFactory resultShuffleSpecFactory, + int maxWorkerCount, + int minStageNumber + ) + { + // need to validate query first + // populate the group of operators to be processed as each stage + // the size of the operators is the number of serialized stages + // later we should also check if these can be parallelized + // check there is an empty over clause or not + List<List<OperatorFactory>> operatorList = new ArrayList<>(); + boolean status = validateAndReturnOperatorList(originalQuery, operatorList); + + + ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount); + // add this shuffle spec to the last stage of the inner query + + final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder().queryId(queryId); + if (nextShuffleSpec != null) { + final ClusterBy windowClusterBy = nextShuffleSpec.clusterBy(); + originalQuery = (WindowOperatorQuery) originalQuery.withOverriddenContext(ImmutableMap.of( + MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL, + windowClusterBy + )); + } + final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( + queryKit, + queryId, + originalQuery.context(), + originalQuery.getDataSource(), + originalQuery.getQuerySegmentSpec(), + originalQuery.getFilter(), + null, + maxWorkerCount, + minStageNumber, + false + ); + + dataSourcePlan.getSubQueryDefBuilder().ifPresent(queryDefBuilder::addAll); + + final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); + final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource()); + RowSignature rowSignature = queryToRun.getRowSignature(); + + + if (status) { + // empty over clause found + // moving everything to a single partition + queryDefBuilder.add( + StageDefinition.builder(firstStageNumber) + .inputs(new StageInputSpec(firstStageNumber - 1)) Review Comment: why is this firstStageNumber -1 ? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java: ########## @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.channel.FrameWithPartition; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.FrameProcessors; +import org.apache.druid.frame.processor.FrameRowTooLargeException; +import org.apache.druid.frame.processor.ReturnOrAwait; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.util.SettableLongVirtualColumn; +import org.apache.druid.frame.write.FrameWriter; +import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.java.util.common.Unit; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; +import org.apache.druid.query.operator.OffsetLimit; +import org.apache.druid.query.operator.Operator; +import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns; +import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object> +{ + private static final Logger log = new Logger(WindowOperatorQueryFrameProcessor.class); + private final WindowOperatorQuery query; + + private final List<OperatorFactory> operatorFactoryList; + private final ObjectMapper jsonMapper; + private final RowSignature outputStageSignature; + private final ArrayList<RowsAndColumns> frameRowsAndCols; + private final ArrayList<RowsAndColumns> resultRowAndCols; + private final ReadableFrameChannel inputChannel; + private final WritableFrameChannel outputChannel; + private final FrameWriterFactory frameWriterFactory; + private final FrameReader frameReader; + private final SettableLongVirtualColumn partitionBoostVirtualColumn; + ArrayList<ResultRow> objectsOfASingleRac; + List<Integer> partitionColsIndex; + private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed + private Cursor frameCursor = null; + private Supplier<ResultRow> rowSupplierFromFrameCursor; + private ResultRow outputRow = null; + private FrameWriter frameWriter = null; + + public WindowOperatorQueryFrameProcessor( + WindowOperatorQuery query, + ReadableFrameChannel inputChannel, + WritableFrameChannel outputChannel, + FrameWriterFactory frameWriterFactory, + FrameReader frameReader, + ObjectMapper jsonMapper, + final List<OperatorFactory> operatorFactoryList, + final RowSignature rowSignature + ) + { + this.inputChannel = inputChannel; + this.outputChannel = outputChannel; + this.frameWriterFactory = frameWriterFactory; + this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN); + this.operatorFactoryList = operatorFactoryList; + this.outputStageSignature = rowSignature; + this.jsonMapper = jsonMapper; + this.frameReader = frameReader; + this.query = query; + this.frameRowsAndCols = new ArrayList<>(); + this.resultRowAndCols = new ArrayList<>(); + this.objectsOfASingleRac = new ArrayList<>(); + this.partitionColsIndex = new ArrayList<>(); + } + + private static VirtualColumns makeVirtualColumnsForFrameWriter( + @Nullable final VirtualColumn partitionBoostVirtualColumn, + final ObjectMapper jsonMapper, + final WindowOperatorQuery query + ) + { + List<VirtualColumn> virtualColumns = new ArrayList<>(); + + virtualColumns.add(partitionBoostVirtualColumn); + final VirtualColumn segmentGranularityVirtualColumn = + QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query); + if (segmentGranularityVirtualColumn != null) { + virtualColumns.add(segmentGranularityVirtualColumn); + } + + return VirtualColumns.create(virtualColumns); + } + + @Override + public List<ReadableFrameChannel> inputChannels() + { + return Collections.singletonList(inputChannel); + } + + @Override + public List<WritableFrameChannel> outputChannels() + { + return Collections.singletonList(outputChannel); + } + + @Override + public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) + { + /* + * + * PARTITION BY A ORDER BY B + * + * Frame 1 -> rac1 + * A B + * 1, 2 + * 1, 3 + * 2, 1 --> key changed + * 2, 2 + * + * + * Frame 2 -> rac2 + * 3, 1 --> key changed + * 3, 2 + * 3, 3 + * 3, 4 + * + * Frame 3 -> rac3 + * + * 3, 5 + * 3, 6 + * 4, 1 --> key changed + * 4, 2 + * + * In case of empty OVER clause, all these racs need to be added to a single rows and columns + * to be processed. The way we can do this is to use a ConcatRowsAndColumns + * ConcatRC [rac1, rac2, rac3] + * Run all ops on this + * + * + * The flow would look like: + * 1. Validate if the operator has an empty OVER clause + * 2. If 1 is true make a giant rows and columns (R&C) using concat as shown above + * Let all operators run amok on that R&C + * 3. If 1 is false + * Read a frame + * keep the older row in a class variable + * check row by row and compare current with older row to check if partition boundary is reached + * when frame partition by changes + * create R&C for those particular set of columns, they would have the same partition key + * output will be a single R&C + * write to output channel + * + * + * Future thoughts: + * + * 1. We are writing 1 partition to each frame in this way. In case of high cardinality data + * we will me making a large number of small frames. We can have a check to keep size of frame to a value + * say 20k rows and keep on adding to the same pending frame and not create a new frame + * + * 2. Current approach with R&C and operators materialize a single R&C for processing. In case of data + * with low cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause + * Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data. + * We might think to reimplement them in the MSQ way so that we do not have to materialize so much data + */ + + // Phase 1 of the execution + // eagerly validate presence of empty OVER() clause + boolean status = checkEagerlyForEmptyWindow(operatorFactoryList); + if (status) { + // if OVER() found + // have to bring all data to a single executor for processing + // convert each frame to rac + // concat all the racs to make a giant rac + // let all operators run on the giant rac when channel is finished + if (inputChannel.canRead()) { + final Frame frame = inputChannel.read(); + convertRowFrameToRowsAndColumns(frame); + } else if (inputChannel.isFinished()) { + runAllOpsOnMultipleRac(frameRowsAndCols); + return ReturnOrAwait.returnObject(Unit.instance()); + } else { + return ReturnOrAwait.awaitAll(inputChannels().size()); + } + return ReturnOrAwait.runAgain(); + } else { + // Aha, you found a PARTITION BY and maybe ORDER BY TO + // PARTITION BY can also be on multiple keys + // typically the last stage would already partition and sort for you + // figure out frame boundaries and convert each distinct group to a rac + // then run the windowing operator only on each rac + if (frameCursor == null || frameCursor.isDone()) { + if (readableInputs.isEmpty()) { + return ReturnOrAwait.awaitAll(1); + } else if (inputChannel.canRead()) { + final Frame frame = inputChannel.read(); + frameCursor = FrameProcessors.makeCursor(frame, frameReader); + final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory(); + partitionColsIndex = findPartitionColumns(frameReader.signature()); + final Supplier<Object>[] fieldSuppliers = new Supplier[frameReader.signature().size()]; + for (int i = 0; i < fieldSuppliers.length; i++) { + final ColumnValueSelector<?> selector = + frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i)); + fieldSuppliers[i] = selector::getObject; + + } + rowSupplierFromFrameCursor = () -> { + final ResultRow row = ResultRow.create(fieldSuppliers.length); + for (int i = 0; i < fieldSuppliers.length; i++) { + row.set(i, fieldSuppliers[i].get()); + } + return row; + }; + } else if (inputChannel.isFinished()) { + // reaached end of channel + // if there is data remaining + // write it into a rac + // and run operators on it + if (!objectsOfASingleRac.isEmpty()) { + RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow( + objectsOfASingleRac, + frameReader.signature() + ); + runAllOpsOnSingleRac(rac); + objectsOfASingleRac.clear(); + } + return ReturnOrAwait.returnObject(Unit.instance()); + } else { + return ReturnOrAwait.runAgain(); + } + } + while (!frameCursor.isDone()) { + final ResultRow currentRow = rowSupplierFromFrameCursor.get(); + if (outputRow == null) { + outputRow = currentRow.copy(); + objectsOfASingleRac.add(currentRow); + } else if (comparePartitionKeys(outputRow, currentRow, partitionColsIndex)) { + // if they have the same partition key + // keep adding them + objectsOfASingleRac.add(currentRow); + } else { + // key change noted + // create rac from the rows seen before + // run the operators on these rows and columns + // clean up the object to hold the new rows only + RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow( + objectsOfASingleRac, + frameReader.signature() + ); + runAllOpsOnSingleRac(rac); + objectsOfASingleRac.clear(); + outputRow = currentRow.copy(); + return ReturnOrAwait.runAgain(); + } + frameCursor.advance(); + } + } + return ReturnOrAwait.runAgain(); + } + + /** + * @param operatorFactoryList the list of operators to check for empty window + * @return true is there is a single OVER() clause across all the operators, false otherwise + */ + private boolean checkEagerlyForEmptyWindow(List<OperatorFactory> operatorFactoryList) + { + for (OperatorFactory of : operatorFactoryList) { + if (of instanceof NaivePartitioningOperatorFactory) { + if (((NaivePartitioningOperatorFactory) of).getPartitionColumns().isEmpty()) { + return true; + } + } + } + return false; + } + + /** + * @param singleRac Use this {@link RowsAndColumns} as a single input for the operators to be run + */ + private void runAllOpsOnSingleRac(RowsAndColumns singleRac) + { + Operator op = new Operator() + { + @Nullable + @Override + public Closeable goOrContinue(Closeable continuationObject, Receiver receiver) + { + receiver.push(singleRac); + receiver.completed(); + return null; + } + }; + runOperatorsAfterThis(op); + } + + /** + * @param listOfRacs Concat this list of {@link RowsAndColumns} to a {@link ConcatRowsAndColumns} to use as a single input for the operators to be run + */ + private void runAllOpsOnMultipleRac(ArrayList<RowsAndColumns> listOfRacs) + { + Operator op = new Operator() + { + @Nullable + @Override + public Closeable goOrContinue(Closeable continuationObject, Receiver receiver) + { + RowsAndColumns rac = new ConcatRowsAndColumns(listOfRacs); + receiver.push(rac); + receiver.completed(); + return null; + } + }; + runOperatorsAfterThis(op); + } + + /** + * @param op Base operator for the operators to be run. Other operators are wrapped under this to run + */ + private void runOperatorsAfterThis(Operator op) + { + for (OperatorFactory of : operatorFactoryList) { + op = of.wrap(op); + } + Operator.go(op, new Operator.Receiver() + { + @Override + public Operator.Signal push(RowsAndColumns rac) + { + resultRowAndCols.add(rac); + return Operator.Signal.GO; + } + + @Override + public void completed() + { + try { + flushAllRowsAndCols(resultRowAndCols); + } + catch (IOException e) { + throw new RuntimeException(e); + } + finally { + resultRowAndCols.clear(); + } + } + }); + } + + /** + * @param resultRowAndCols Flush the list of {@link RowsAndColumns} to a frame + * @throws IOException + */ + private void flushAllRowsAndCols(ArrayList<RowsAndColumns> resultRowAndCols) throws IOException + { + RowsAndColumns rac = new ConcatRowsAndColumns(resultRowAndCols); + AtomicInteger rowId = new AtomicInteger(0); + createFrameWriterIfNeeded(rac, rowId); + writeRacToFrame(rac, rowId); + } + + /** + * @param rac The frame writer to write this {@link RowsAndColumns} object + * @param rowId RowId to get the column selector factory from the {@link RowsAndColumns} object + */ + private void createFrameWriterIfNeeded(RowsAndColumns rac, AtomicInteger rowId) + { + if (frameWriter == null) { + final ColumnSelectorFactoryMaker csfm = ColumnSelectorFactoryMaker.fromRAC(rac); + final ColumnSelectorFactory frameWriterColumnSelectorFactory = makeVirtualColumnsForFrameWriter( + partitionBoostVirtualColumn, + jsonMapper, + query + ).wrap(csfm.make(rowId)); + frameWriter = frameWriterFactory.newFrameWriter(frameWriterColumnSelectorFactory); + currentAllocatorCapacity = frameWriterFactory.allocatorCapacity(); + } + } + + /** + * @param rac {@link RowsAndColumns} to be written to frame + * @param rowId Counter to keep track of how many rows are added + * @throws IOException + */ + public void writeRacToFrame(RowsAndColumns rac, AtomicInteger rowId) throws IOException + { + final int numRows = rac.numRows(); + rowId.set(0); + while (rowId.get() < numRows) { + final boolean didAddToFrame = frameWriter.addSelection(); + if (didAddToFrame) { + incrementBoostColumn(); + rowId.incrementAndGet(); + } else if (frameWriter.getNumRows() == 0) { + throw new FrameRowTooLargeException(currentAllocatorCapacity); + } else { + flushFrameWriter(); + return; + } + } + flushFrameWriter(); + } + + @Override + public void cleanup() throws IOException + { + FrameProcessors.closeAll(inputChannels(), outputChannels(), frameWriter); + } + + /** + * @return Number of rows flushed to the output channel + * @throws IOException + */ + private long flushFrameWriter() throws IOException + { + if (frameWriter == null || frameWriter.getNumRows() <= 0) { + if (frameWriter != null) { + frameWriter.close(); + frameWriter = null; + } + return 0; + } else { + final Frame frame = Frame.wrap(frameWriter.toByteArray()); + Iterables.getOnlyElement(outputChannels()).write(new FrameWithPartition(frame, FrameWithPartition.NO_PARTITION)); + frameWriter.close(); + frameWriter = null; + return frame.numRows(); + } + } + + /** + * @param frame Row based frame to be converted to a {@link RowsAndColumns} object + */ + private void convertRowFrameToRowsAndColumns(Frame frame) + { + final RowSignature signature = frameReader.signature(); + RowBasedFrameRowsAndColumns frameRowsAndColumns = new RowBasedFrameRowsAndColumns(frame, signature); + LazilyDecoratedRowsAndColumns ldrc = new LazilyDecoratedRowsAndColumns( + frameRowsAndColumns, + null, + null, + null, + OffsetLimit.limit(Integer.MAX_VALUE), + null, + null + ); + frameRowsAndCols.add(ldrc); + } + + private List<Integer> findPartitionColumns(RowSignature rowSignature) + { + List<Integer> indexList = new ArrayList<>(); + for (OperatorFactory of : operatorFactoryList) { + if (of instanceof NaivePartitioningOperatorFactory) { + for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) { + indexList.add(rowSignature.indexOf(s)); + } + } + } + return indexList; + } + + private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<Integer> partitionIndices) Review Comment: How would stuff work in a scan query which has a boosted column. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
