imply-cheddar commented on code in PR #13506:
URL: https://github.com/apache/druid/pull/13506#discussion_r1125836072
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java:
##########
@@ -212,24 +219,51 @@ public boolean doesShuffle()
public boolean doesSortDuringShuffle()
{
- if (shuffleSpec == null) {
+ if (shuffleSpec == null || shuffleSpec.clusterBy().isEmpty()) {
return false;
} else {
- return !shuffleSpec.getClusterBy().getColumns().isEmpty() ||
shuffleSpec.needsStatistics();
+ return shuffleSpec.clusterBy().sortable();
}
}
- public Optional<ShuffleSpec> getShuffleSpec()
+ /**
+ * Returns the {@link ShuffleSpec} for this stage, if {@link #doesShuffle()}.
+ *
+ * @throws IllegalStateException if this stage does not shuffle
+ */
+ public ShuffleSpec getShuffleSpec()
{
- return Optional.ofNullable(shuffleSpec);
+ if (shuffleSpec == null) {
+ throw new IllegalStateException("Stage does not shuffle");
+ }
+
+ return shuffleSpec;
}
Review Comment:
Is there a way to know that the stage shuffles or not before calling this
method (to avoid the ISE)?
I see changes to `doesSortDuringShuffle`, but that seems to indicate more
than just "does the current stage "shuffle".
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java:
##########
@@ -93,15 +110,26 @@ public Either<Long, ClusterByPartitions>
generatePartitions(
@Override
@JsonProperty
- public ClusterBy getClusterBy()
+ public ClusterBy clusterBy()
{
return clusterBy;
}
- @JsonProperty
- int getPartitions()
+ @Override
+ public int partitionCount()
+ {
+ if (maxPartitions == 1) {
+ return maxPartitions;
+ } else {
+ // Number of actual partitions may be less than maxPartitions.
+ throw new IllegalStateException("Number of partitions not known.");
+ }
+ }
Review Comment:
This logic makes no sense to me? The partitionCount equals the
maxPartitions if maxPartitions == 1? Doesn't that mean that partitionCount is
either 1 or an exception?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java:
##########
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.common;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.key.FrameComparisonWidget;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.key.RowKey;
+import org.apache.druid.frame.key.RowKeyReader;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameCursor;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.exec.Limits;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.TooManyRowsWithSameKeyFault;
+import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.DimensionSelectorUtils;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.NilColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.data.ZeroIndexedInts;
+import org.apache.druid.segment.join.JoinPrefixUtils;
+import org.apache.druid.segment.join.JoinType;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Processor for a sort-merge join of two inputs.
+ *
+ * Prerequisites:
+ *
+ * 1) Two inputs, both of which are stages; i.e. {@link
ReadableInput#hasChannel()}.
+ *
+ * 2) Conditions are all simple equalities. Validated by {@link
SortMergeJoinFrameProcessorFactory#validateCondition}
+ * and then transformed to lists of key columns by {@link
SortMergeJoinFrameProcessorFactory#toKeyColumns}.
+ *
+ * 3) Both inputs are comprised of {@link
org.apache.druid.frame.FrameType#ROW_BASED} frames, are sorted by the same
+ * key, and that key can be used to check the provided condition. Validated by
+ * {@link SortMergeJoinFrameProcessorFactory#validateInputFrameSignatures}.
+ *
+ * Algorithm:
+ *
+ * 1) Read current key from each side of the join.
+ *
+ * 2) If there is no match, emit or skip the row for the earlier key, as
appropriate, based on the join type.
+ *
+ * 3) If there is a match, identify a complete set on one side or the other.
(It doesn't matter which side has the
+ * complete set, but we need it on one of them.) We mark the first row for the
key using {@link Tracker#markCurrent()}
+ * and find complete sets using {@link Tracker#hasCompleteSetForMark()}. Once
we find one, we store it in
+ * {@link #trackerWithCompleteSetForCurrentKey}. If both sides have a complete
set, we break ties by choosing the
+ * left side.
+ *
+ * 4) Once a complete set for the current key is identified: for each row on
the *other* side, loop through the entire
+ * set of rows on {@link #trackerWithCompleteSetForCurrentKey}, and emit that
many joined rows.
+ *
+ * 5) Once we process the final row on the *other* side, reset both marks with
{@link Tracker#markCurrent()} and
+ * continue the algorithm.
+ */
+public class SortMergeJoinFrameProcessor implements FrameProcessor<Long>
+{
+ private static final int LEFT = 0;
+ private static final int RIGHT = 1;
+
+ /**
+ * Input channels for each side of the join. Two-element array: {@link
#LEFT} and {@link #RIGHT}.
+ */
+ private final List<ReadableFrameChannel> inputChannels;
+
+ /**
+ * Trackers for each side of the join. Two-element array: {@link #LEFT} and
{@link #RIGHT}.
+ */
+ private final List<Tracker> trackers;
+
+ private final WritableFrameChannel outputChannel;
+ private final FrameWriterFactory frameWriterFactory;
+ private final String rightPrefix;
+ private final JoinType joinType;
+ private final JoinColumnSelectorFactory joinColumnSelectorFactory = new
JoinColumnSelectorFactory();
+ private FrameWriter frameWriter = null;
+
+ // Used by runIncrementally to defer certain logic to the next run.
+ private Runnable nextIterationRunnable = null;
+
+ // Used by runIncrementally to remember which tracker has the complete set
for the current key.
+ private int trackerWithCompleteSetForCurrentKey = -1;
+
+ SortMergeJoinFrameProcessor(
+ ReadableInput left,
+ ReadableInput right,
+ WritableFrameChannel outputChannel,
+ FrameWriterFactory frameWriterFactory,
+ String rightPrefix,
+ List<List<KeyColumn>> keyColumns,
+ JoinType joinType
+ )
+ {
+ this.inputChannels = ImmutableList.of(left.getChannel(),
right.getChannel());
+ this.outputChannel = outputChannel;
+ this.frameWriterFactory = frameWriterFactory;
+ this.rightPrefix = rightPrefix;
+ this.joinType = joinType;
+ this.trackers = ImmutableList.of(
+ new Tracker(left, keyColumns.get(LEFT)),
+ new Tracker(right, keyColumns.get(RIGHT))
+ );
+ }
+
+ @Override
+ public List<ReadableFrameChannel> inputChannels()
+ {
+ return inputChannels;
+ }
+
+ @Override
+ public List<WritableFrameChannel> outputChannels()
+ {
+ return Collections.singletonList(outputChannel);
+ }
+
+ @Override
+ public ReturnOrAwait<Long> runIncrementally(IntSet readableInputs) throws
IOException
+ {
+ // Fetch enough frames such that each tracker has one readable row.
+ for (int i = 0; i < inputChannels.size(); i++) {
+ final Tracker tracker = trackers.get(i);
+ if (tracker.isAtEndOfPushedData() && !pushNextFrame(i)) {
+ return nextAwait();
+ }
+ }
+
+ // Initialize new output frame, if needed.
+ startNewFrameIfNeeded();
+
+ while (!allTrackersAreAtEnd()
+ && !trackers.get(LEFT).needsMoreData()
+ && !trackers.get(RIGHT).needsMoreData()) {
+ // Algorithm can proceed: not all trackers are at the end of their
streams, and no tracker needs more data to
+ // read the current cursor or move it forward.
+ if (nextIterationRunnable != null) {
+ final Runnable tmp = nextIterationRunnable;
+ nextIterationRunnable = null;
+ tmp.run();
+ }
+
+ final int markCmp = compareMarks();
+
+ // Two rows match if the keys compare equal _and_ neither key has a null
component. (x JOIN y ON x.a = y.a does
+ // not match rows where "x.a" is null.)
+ final boolean match = markCmp == 0 &&
trackers.get(LEFT).hasCompletelyNonNullMark();
+
+ // If marked keys are equal on both sides ("match"), at least one side
must have a complete set of rows
+ // for the marked key.
+ if (match && trackerWithCompleteSetForCurrentKey < 0) {
+ for (int i = 0; i < inputChannels.size(); i++) {
+ final Tracker tracker = trackers.get(i);
+
+ // Fetch up to one frame from each tracker, to check if that tracker
has a complete set.
+ // Can't fetch more than one frame, because channels are only
guaranteed to have one frame per run.
+ if (tracker.hasCompleteSetForMark() || (pushNextFrame(i) &&
tracker.hasCompleteSetForMark())) {
+ trackerWithCompleteSetForCurrentKey = i;
+ break;
+ }
+ }
+
+ if (trackerWithCompleteSetForCurrentKey < 0) {
+ // Algorithm cannot proceed; fetch more frames on the next run.
+ return nextAwait();
+ }
+ }
+
+ if (match || (markCmp <= 0 && joinType.isLefty()) || (markCmp >= 0 &&
joinType.isRighty())) {
+ // Emit row, if there's room in the current frameWriter.
+ joinColumnSelectorFactory.cmp = markCmp;
+ joinColumnSelectorFactory.match = match;
+
+ if (!frameWriter.addSelection()) {
+ if (frameWriter.getNumRows() > 0) {
+ // Out of space in the current frame. Run again without moving
cursors.
+ flushCurrentFrame();
+ return ReturnOrAwait.runAgain();
+ } else {
+ throw new
FrameRowTooLargeException(frameWriterFactory.allocatorCapacity());
+ }
+ }
+ }
+
+ // Advance one or both trackers.
+ if (match) {
+ // Matching keys. First advance the tracker with the complete set.
+ final Tracker tracker =
trackers.get(trackerWithCompleteSetForCurrentKey);
+ final Tracker otherTracker =
trackers.get(trackerWithCompleteSetForCurrentKey == LEFT ? RIGHT : LEFT);
+
+ tracker.advance();
+ if (!tracker.isCurrentSameKeyAsMark()) {
+ // Reached end of complete set. Advance the other tracker.
+ otherTracker.advance();
+
+ // On next iteration (when we're sure to have data) either rewind
the complete-set tracker, or update marks
+ // of both, as appropriate.
+ onNextIteration(() -> {
+ if (otherTracker.isCurrentSameKeyAsMark()) {
+ otherTracker.markCurrent(); // Set mark to enable cleanup of old
frames.
+ tracker.rewindToMark();
+ } else {
+ // Reached end of the other side too. Advance marks on both
trackers.
+ tracker.markCurrent();
+ otherTracker.markCurrent();
+ trackerWithCompleteSetForCurrentKey = -1;
+ }
+ });
+ }
+ } else {
+ final int trackerToAdvance;
+
+ if (markCmp < 0) {
+ trackerToAdvance = LEFT;
+ } else if (markCmp > 0) {
+ trackerToAdvance = RIGHT;
+ } else {
+ // Key is null on both sides. Note that there is a preference for
running through the left side first
+ // on a FULL join. It doesn't really matter which side we run
through first, but we do need to be consistent
+ // for the benefit of the logic in "shouldEmitColumnValue".
+ trackerToAdvance = joinType.isLefty() ? LEFT : RIGHT;
+ }
+
+ final Tracker tracker = trackers.get(trackerToAdvance);
+
+ tracker.advance();
+
+ // On next iteration (when we're sure to have data), update mark if
the key changed.
+ onNextIteration(() -> {
+ if (!tracker.isCurrentSameKeyAsMark()) {
+ tracker.markCurrent();
+ trackerWithCompleteSetForCurrentKey = -1;
+ }
+ });
+ }
+ }
+
+ if (allTrackersAreAtEnd()) {
+ flushCurrentFrame();
+ return ReturnOrAwait.returnObject(0L);
+ } else {
+ // Keep reading.
+ return nextAwait();
+ }
+ }
+
+ @Override
+ public void cleanup() throws IOException
+ {
+ FrameProcessors.closeAll(inputChannels(), outputChannels(), frameWriter,
() -> trackers.forEach(Tracker::clear));
+ }
+
+ /**
+ * Returns a {@link ReturnOrAwait#awaitAll} for the channel numbers that
need more data and have not yet hit their
+ * buffered-bytes limit, {@link
Limits#MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN}.
+ *
+ * If all channels have hit their limit, throws {@link MSQException} with
{@link TooManyRowsWithSameKeyFault}.
+ */
+ private ReturnOrAwait<Long> nextAwait()
+ {
+ final IntSet awaitSet = new IntOpenHashSet();
+ int trackerAtLimit = -1;
+
+ for (int i = 0; i < inputChannels.size(); i++) {
+ final Tracker tracker = trackers.get(i);
+ if (tracker.needsMoreData()) {
+ if (tracker.totalBytesBuffered() <
Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN) {
+ awaitSet.add(i);
+ } else if (trackerAtLimit < 0) {
+ trackerAtLimit = i;
+ }
+ }
+ }
+
+ if (awaitSet.isEmpty() && trackerAtLimit > 0) {
+ // All trackers that need more data are at their max buffered bytes
limit. Generate a nice exception.
+ final Tracker tracker = trackers.get(trackerAtLimit);
+ if (tracker.totalBytesBuffered() >
Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN) {
+ // Generate a nice exception.
+ throw new MSQException(
+ new TooManyRowsWithSameKeyFault(
+ tracker.readMarkKey(),
+ tracker.totalBytesBuffered(),
+ Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN
+ )
+ );
+ }
+ }
+
+ return ReturnOrAwait.awaitAll(awaitSet);
+ }
+
+ /**
+ * Whether all trackers return true from {@link Tracker#isAtEnd()}.
+ */
+ private boolean allTrackersAreAtEnd()
+ {
+ for (Tracker tracker : trackers) {
+ if (!tracker.isAtEnd()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Compares the marked rows of the two {@link #trackers}.
+ *
+ * @throws IllegalStateException if either tracker does not have a marked
row and is not completely done
+ */
+ private int compareMarks()
+ {
+ final Tracker leftTracker = trackers.get(LEFT);
+ final Tracker rightTracker = trackers.get(RIGHT);
+
+ Preconditions.checkState(leftTracker.hasMark() || leftTracker.isAtEnd(),
"left.hasMark || left.isAtEnd");
+ Preconditions.checkState(rightTracker.hasMark() || rightTracker.isAtEnd(),
"right.hasMark || right.isAtEnd");
+
+ if (!leftTracker.hasMark()) {
+ return rightTracker.markFrame < 0 ? 0 : 1;
+ } else if (!rightTracker.hasMark()) {
+ return -1;
+ } else {
+ final FrameHolder leftHolder =
leftTracker.holders.get(leftTracker.markFrame);
+ final FrameHolder rightHolder =
rightTracker.holders.get(rightTracker.markFrame);
+ return leftHolder.comparisonWidget.compare(
+ leftTracker.markRow,
+ rightHolder.comparisonWidget,
+ rightTracker.markRow
+ );
+ }
+ }
+
+ /**
+ * Pushes a frame from the indicated channel into the appropriate tracker.
Returns true if a frame was pushed
+ * or if the channel is finished.
+ */
+ private boolean pushNextFrame(final int channelNumber)
+ {
+ final ReadableFrameChannel channel = inputChannels.get(channelNumber);
+ final Tracker tracker = trackers.get(channelNumber);
+
+ if (!channel.isFinished() && !channel.canRead()) {
+ return false;
+ } else if (channel.isFinished()) {
+ tracker.push(null);
+ return true;
+ } else {
+ final Frame frame = channel.read();
+
+ if (frame.numRows() == 0) {
+ // Skip, read next.
+ return false;
+ } else {
+ tracker.push(frame);
+ return true;
+ }
+ }
+ }
+
+ private void onNextIteration(final Runnable runnable)
+ {
+ if (nextIterationRunnable != null) {
+ throw new ISE("postAdvanceRunnable already set");
Review Comment:
Is there any extra information that can be interpolated or anything in here
to try to help the person who runs into this exception figure out what is going
on?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/HashShuffleSpec.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
+
+import javax.annotation.Nullable;
+
+public class HashShuffleSpec implements ShuffleSpec
+{
+ public static final String TYPE = "hash";
+
+ private final ClusterBy clusterBy;
+ private final int numPartitions;
+
+ @JsonCreator
+ public HashShuffleSpec(
+ @JsonProperty("clusterBy") final ClusterBy clusterBy,
+ @JsonProperty("partitions") final int numPartitions
+ )
+ {
+ this.clusterBy = clusterBy;
+ this.numPartitions = numPartitions;
+
+ if (clusterBy.getBucketByCount() > 0) {
+ // Only GlobalSortTargetSizeShuffleSpec supports bucket-by.
+ throw new IAE("Cannot bucket with %s partitioning", TYPE);
+ }
Review Comment:
Why does this spec have to validate for something that is only supported by
something else? Is there no way to make it so that the only thing that
supports it is the only thing that takes it?
Additionally, if you were to interpolate the `clusterBy` into the error
message, would it generate something that tells the user which part of their
query is doing weird things? (I'm imagining a complex query with lots of WITH
and sub-queries, it can be hard to figure out which one is acting weird
sometimes)
##########
processing/src/main/java/org/apache/druid/frame/key/ClusterBy.java:
##########
@@ -58,6 +59,21 @@ public ClusterBy(
if (bucketByCount < 0 || bucketByCount > columns.size()) {
throw new IAE("Invalid bucketByCount [%d]", bucketByCount);
}
+
+ // Key must be 100% sortable or 100% nonsortable. If empty, call it
sortable.
+ boolean sortable = true;
+
+ for (int i = 0; i < columns.size(); i++) {
+ final KeyColumn column = columns.get(i);
+
+ if (i == 0) {
+ sortable = column.order().sortable();
+ } else if (sortable != column.order().sortable()) {
+ throw new IAE("Cannot mix sortable and unsortable key columns");
+ }
+ }
+
+ this.sortable = sortable;
Review Comment:
Code nit, could also do:
```
if (columns.size == 0) {
// no columns means that it's sortable/sorted as they are effectively all
sorted by the same non-existant, constant value.
this.sortable = true;
} else {
boolean sortable = columns.get(0).order().sortable();
for (int i = 1; i < columns.size; ++i) {
if (sortable != column.order().sortable()) {
throw new IAE("Cannot mix sortable and unsortable key columns");
}
}
}
```
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java:
##########
@@ -288,37 +322,72 @@ public boolean mustGatherResultKeyStatistics()
return shuffleSpec != null && shuffleSpec.needsStatistics();
}
- public Either<Long, ClusterByPartitions> generatePartitionsForShuffle(
+ public Either<Long, ClusterByPartitions>
generatePartitionBoundariesForShuffle(
@Nullable ClusterByStatisticsCollector collector
)
{
if (shuffleSpec == null) {
throw new ISE("No shuffle for stage[%d]", getStageNumber());
+ } else if (shuffleSpec.kind() != ShuffleKind.GLOBAL_SORT) {
+ throw new ISE(
+ "Shuffle of kind [%s] cannot generate partition boundaries for
stage[%d]",
+ shuffleSpec.kind(),
+ getStageNumber()
+ );
} else if (mustGatherResultKeyStatistics() && collector == null) {
throw new ISE("Statistics required, but not gathered for stage[%d]",
getStageNumber());
} else if (!mustGatherResultKeyStatistics() && collector != null) {
throw new ISE("Statistics gathered, but not required for stage[%d]",
getStageNumber());
} else {
- return shuffleSpec.generatePartitions(collector, MAX_PARTITIONS);
+ return shuffleSpec.generatePartitionsForGlobalSort(collector,
MAX_PARTITIONS);
}
}
public ClusterByStatisticsCollector createResultKeyStatisticsCollector(final
int maxRetainedBytes)
{
if (!mustGatherResultKeyStatistics()) {
- throw new ISE("No statistics needed");
+ throw new ISE("No statistics needed for stage[%d]", getStageNumber());
}
return ClusterByStatisticsCollectorImpl.create(
- shuffleSpec.getClusterBy(),
+ shuffleSpec.clusterBy(),
signature,
maxRetainedBytes,
PARTITION_STATS_MAX_BUCKETS,
- shuffleSpec.doesAggregateByClusterKey(),
+ shuffleSpec.doesAggregate(),
shuffleCheckHasMultipleValues
);
Review Comment:
I'm not sure the semantics of "mustGatherStatistics", but "must gather" and
"might choose to gather if the user asked nicely" could be seen as different
things. If a user asked for full statistics on over their run, would the
"mustGather" method end up returning true?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/HashShuffleSpec.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
+
+import javax.annotation.Nullable;
+
+public class HashShuffleSpec implements ShuffleSpec
+{
+ public static final String TYPE = "hash";
+
+ private final ClusterBy clusterBy;
+ private final int numPartitions;
+
+ @JsonCreator
+ public HashShuffleSpec(
+ @JsonProperty("clusterBy") final ClusterBy clusterBy,
+ @JsonProperty("partitions") final int numPartitions
+ )
+ {
+ this.clusterBy = clusterBy;
+ this.numPartitions = numPartitions;
+
+ if (clusterBy.getBucketByCount() > 0) {
+ // Only GlobalSortTargetSizeShuffleSpec supports bucket-by.
+ throw new IAE("Cannot bucket with %s partitioning", TYPE);
+ }
+ }
+
+ @Override
+ public ShuffleKind kind()
+ {
+ return clusterBy.sortable() && !clusterBy.isEmpty() ?
ShuffleKind.HASH_LOCAL_SORT : ShuffleKind.HASH;
+ }
+
+ @Override
+ @JsonProperty
+ public ClusterBy clusterBy()
+ {
+ return clusterBy;
+ }
+
+ @Override
+ public boolean doesAggregate()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean needsStatistics()
+ {
+ return false;
+ }
+
+ @Override
+ public Either<Long, ClusterByPartitions> generatePartitionsForGlobalSort(
+ @Nullable ClusterByStatisticsCollector collector,
+ int maxNumPartitions
+ )
+ {
+ throw new IllegalStateException("Not a global sort");
+ }
Review Comment:
Can the global sort methods exist only on the things that do global sort?
One of the things that sucks the most about the AggregatorFactory and other
interfaces is the number of methods that can be ignored and/or are only special
purpose, would be nice to keep special things relegated to their own special
location if at all possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]