gianm commented on code in PR #16533:
URL: https://github.com/apache/druid/pull/16533#discussion_r1702047874
##########
extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java:
##########
@@ -54,7 +57,8 @@ public void setUp()
@Test
public void testRead()
{
- try (final VectorCursor cursor = makeCursor()) {
+ try (final CursorMaker maker = makeCursor()) {
Review Comment:
`makeCursorMaker()`?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java:
##########
@@ -294,15 +289,22 @@ protected ReturnOrAwait<Unit> runWithInputChannel(
final Frame frame = inputChannel.read();
final FrameSegment frameSegment = new FrameSegment(frame,
inputFrameReader, SegmentId.dummy("scan"));
- final long rowsFlushed = setNextCursor(
- Iterables.getOnlyElement(
- makeCursors(
- query.withQuerySegmentSpec(new
MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)),
- mapSegment(frameSegment).asStorageAdapter()
- ).toList()
- ),
- frameSegment
- );
+ final StorageAdapter adapter =
mapSegment(frameSegment).asStorageAdapter();
+ if (adapter == null) {
+ throw new ISE(
+ "Null storage adapter found. Probably trying to issue a query
against a segment being memory unmapped."
+ );
+ }
+
+ final CursorMaker maker =
closer.register(adapter.asCursorMaker(query.asCursorBuildSpec(null)));
+ final Cursor nextCursor = maker.makeCursor();
+
+ if (nextCursor == null) {
+ // no cursor
+ maker.close();
Review Comment:
`maker` is already registered with the `closer`, so I think there's no need
to do `maker.close()` here. The old code hadn't registered the `cursorYielder`
at this point.
##########
processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.filter.Filter;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class CursorBuildSpec
+{
+ public static final CursorBuildSpec FULL_SCAN =
CursorBuildSpec.builder().setGranularity(Granularities.ALL).build();
+
+ public static CursorBuildSpecBuilder builder()
+ {
+ return new CursorBuildSpecBuilder();
+ }
+
+ public static CursorBuildSpecBuilder builder(CursorBuildSpec spec)
+ {
+ return new CursorBuildSpecBuilder(spec);
+ }
+
+ @Nullable
+ private final Filter filter;
+ private final Interval interval;
+ private final Granularity granularity;
+ @Nullable
+ private final List<String> groupingColumns;
+ private final VirtualColumns virtualColumns;
+ @Nullable
+ private final List<AggregatorFactory> aggregators;
+
+ private final QueryContext queryContext;
+
+ private final boolean descending;
+ @Nullable
+ private final QueryMetrics<?> queryMetrics;
+
+ public CursorBuildSpec(
+ @Nullable Filter filter,
+ Interval interval,
+ Granularity granularity,
+ @Nullable List<String> groupingColumns,
+ VirtualColumns virtualColumns,
+ @Nullable List<AggregatorFactory> aggregators,
+ QueryContext queryContext,
+ boolean descending,
+ @Nullable QueryMetrics<?> queryMetrics
+ )
+ {
+ this.filter = filter;
+ this.interval = interval;
+ this.granularity = granularity;
+ this.groupingColumns = groupingColumns;
+ this.virtualColumns = virtualColumns;
+ this.aggregators = aggregators;
+ this.descending = descending;
+ this.queryContext = queryContext;
+ this.queryMetrics = queryMetrics;
+ }
+
+ @Nullable
+ public Filter getFilter()
+ {
+ return filter;
+ }
+
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ public Granularity getGranularity()
+ {
+ return granularity;
+ }
+
+ @Nullable
+ public List<String> getGroupingColumns()
+ {
+ return groupingColumns;
+ }
+
+ public VirtualColumns getVirtualColumns()
+ {
+ return virtualColumns;
+ }
+
+ @Nullable
+ public List<AggregatorFactory> getAggregators()
+ {
+ return aggregators;
+ }
+
+ public boolean isDescending()
Review Comment:
Since we're changing the interfaces, better to drop `descending` and replace
it with `sortOrder`. That's useful for projections since different projections
might have different sort orders.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java:
##########
@@ -252,21 +245,23 @@ protected ReturnOrAwait<Unit> runWithSegment(final
SegmentWithDescriptor segment
if (cursor == null) {
final ResourceHolder<Segment> segmentHolder =
closer.register(segment.getOrLoad());
- final Yielder<Cursor> cursorYielder = Yielders.each(
- makeCursors(
- query.withQuerySegmentSpec(new
SpecificSegmentSpec(segment.getDescriptor())),
- mapSegment(segmentHolder.get()).asStorageAdapter()
- )
- );
+ final StorageAdapter adapter =
mapSegment(segmentHolder.get()).asStorageAdapter();
+ if (adapter == null) {
+ throw new ISE(
+ "Null storage adapter found. Probably trying to issue a query
against a segment being memory unmapped."
+ );
+ }
- if (cursorYielder.isDone()) {
+ final CursorMaker maker =
closer.register(adapter.asCursorMaker(query.asCursorBuildSpec(null)));
+ final Cursor nextCursor = maker.makeCursor();
+
+ if (nextCursor == null) {
// No cursors!
- cursorYielder.close();
+ maker.close();
Review Comment:
`maker` is already registered with the `closer`, so I think there's no need
to do `maker.close()` here. The old code hadn't registered the `cursorYielder`
at this point.
##########
processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.filter.Filter;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class CursorBuildSpec
+{
+ public static final CursorBuildSpec FULL_SCAN =
CursorBuildSpec.builder().setGranularity(Granularities.ALL).build();
+
+ public static CursorBuildSpecBuilder builder()
+ {
+ return new CursorBuildSpecBuilder();
+ }
+
+ public static CursorBuildSpecBuilder builder(CursorBuildSpec spec)
+ {
+ return new CursorBuildSpecBuilder(spec);
+ }
+
+ @Nullable
+ private final Filter filter;
+ private final Interval interval;
+ private final Granularity granularity;
+ @Nullable
+ private final List<String> groupingColumns;
+ private final VirtualColumns virtualColumns;
+ @Nullable
+ private final List<AggregatorFactory> aggregators;
+
+ private final QueryContext queryContext;
+
+ private final boolean descending;
+ @Nullable
+ private final QueryMetrics<?> queryMetrics;
+
+ public CursorBuildSpec(
+ @Nullable Filter filter,
+ Interval interval,
+ Granularity granularity,
+ @Nullable List<String> groupingColumns,
+ VirtualColumns virtualColumns,
+ @Nullable List<AggregatorFactory> aggregators,
+ QueryContext queryContext,
+ boolean descending,
+ @Nullable QueryMetrics<?> queryMetrics
+ )
+ {
+ this.filter = filter;
+ this.interval = interval;
+ this.granularity = granularity;
+ this.groupingColumns = groupingColumns;
+ this.virtualColumns = virtualColumns;
+ this.aggregators = aggregators;
+ this.descending = descending;
+ this.queryContext = queryContext;
+ this.queryMetrics = queryMetrics;
+ }
+
+ @Nullable
+ public Filter getFilter()
+ {
+ return filter;
+ }
+
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ public Granularity getGranularity()
+ {
+ return granularity;
+ }
+
+ @Nullable
+ public List<String> getGroupingColumns()
+ {
+ return groupingColumns;
+ }
+
+ public VirtualColumns getVirtualColumns()
+ {
+ return virtualColumns;
+ }
+
+ @Nullable
+ public List<AggregatorFactory> getAggregators()
+ {
+ return aggregators;
+ }
+
+ public boolean isDescending()
+ {
+ return descending;
+ }
+
+ public QueryContext getQueryContext()
+ {
+ return queryContext;
+ }
+
+ @Nullable
+ public QueryMetrics<?> getQueryMetrics()
+ {
+ return queryMetrics;
+ }
+
+ public static class CursorBuildSpecBuilder
+ {
+ @Nullable
+ private Filter filter;
+ private Interval interval = Intervals.ETERNITY;
+ private Granularity granularity = Granularities.NONE;
+
+ @Nullable
+ private List<String> groupingColumns = null;
+ private VirtualColumns virtualColumns = VirtualColumns.EMPTY;
+ @Nullable
+ private List<AggregatorFactory> aggregators = null;
+ private boolean descending = false;
+
+ private QueryContext queryContext = QueryContext.empty();
+ @Nullable
+ private QueryMetrics<?> queryMetrics;
+
+ private CursorBuildSpecBuilder()
+ {
+ //
Review Comment:
`//` what? Possibly should be `// Do nothing.`
##########
processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java:
##########
@@ -81,18 +82,22 @@ public RowSignature getRowSignature()
public Sequence<Object[]> getRowsAsSequence()
{
-
+ final Closer closer = Closer.create();
final Sequence<Cursor> cursorSequence =
Sequences.simple(frames)
- .flatMap(
+ .map(
Review Comment:
IMO, it's better closeable management to use a `flatMap` with single-cursor
sequences that individually do `withBaggage(maker)`. That way the makers get
closed as soon as we're done with them, instead of all at the end.
It doesn't really matter right now for frames, but I'm thinking in case it
does matter in the future.
##########
processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorMaker.java:
##########
@@ -668,4 +724,43 @@
return new DescendingTimestampCheckingOffset(baseOffset.clone(),
timestamps, timeLimit, allWithinThreshold);
}
}
+
+ private final class CursorResources implements Closeable
Review Comment:
seems like a good point ☝️
##########
processing/src/main/java/org/apache/druid/segment/Cursor.java:
##########
@@ -35,11 +36,45 @@
*/
public interface Cursor
{
+ /**
+ * Get a {@link ColumnSelectorFactory} whose selectors will be backed by the
row values at the current position of
+ * the cursor
+ */
ColumnSelectorFactory getColumnSelectorFactory();
- DateTime getTime();
+
+ /**
+ * Advance to the next row in the cursor, checking if thread has been
interrupted after advancing and possibly
+ * throwing {@link QueryInterruptedException} if so
+ */
void advance();
+
+ /**
+ * Advance to the next row in the cursor
+ */
void advanceUninterruptibly();
+
+ /**
+ * Check if there are any additional rows in the cursor
+ */
boolean isDone();
+
+ /**
+ * Check if there are any additional rows in the cursor, or if the thread
has been interrupted
+ */
boolean isDoneOrInterrupted();
+
+ /**
+ * Mark a position on the cursor which can recalled with {@link
#resetToMark()}
+ */
+ void mark(DateTime mark);
+
+ /**
+ * Reset to position set by {@link #mark(DateTime)}
+ */
+ void resetToMark();
+
+ /**
+ * Reset to start of cursor
Review Comment:
Should specify that this also clears the mark.
##########
processing/src/main/java/org/apache/druid/segment/CursorMaker.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.segment.vector.VectorCursor;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+
+public interface CursorMaker extends Closeable
Review Comment:
I wonder if `CursorHolder` is a better name than `CursorMaker`? Using
"holder" rather than "maker" better emphasizes the fact that the `CursorMaker`
(or `CursorHolder`) is closeable— it suggests it's a container that wraps a
`Cursor` or `VectorCursor` + the resources used by the cursor. It also avoids
the awkward name `CursorMakerFactory`.
If we did that, I think we'd also change:
- `makeCursor` -> `asCursor()` or `getCursor()`
- `makeVectorCursor` -> `asVectorCursor()` or `getVectorCursor()`
- `CursorMakerFactory` -> `CursorHolderFactory` or even stick with
`CursorFactory`
- `CursorMakerFactory#asCursorMaker` -> `#makeCursorHolder`
- possibly update `CursorMaker` (`CursorHolder`) implementations to stash
the cursor in a local field, so `asCursor()` or `getCursor()` could be called
multiple times and return the same cursor. This would enable query engines to
pass around the `CursorHolder` as a unified `Closeable` + holder for the actual
cursor, which would potentially simplify the logic around tracking that things
are closed properly.
##########
processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.filter.Filter;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class CursorBuildSpec
+{
+ public static final CursorBuildSpec FULL_SCAN =
CursorBuildSpec.builder().setGranularity(Granularities.ALL).build();
+
+ public static CursorBuildSpecBuilder builder()
+ {
+ return new CursorBuildSpecBuilder();
+ }
+
+ public static CursorBuildSpecBuilder builder(CursorBuildSpec spec)
+ {
+ return new CursorBuildSpecBuilder(spec);
+ }
+
+ @Nullable
+ private final Filter filter;
+ private final Interval interval;
+ private final Granularity granularity;
+ @Nullable
+ private final List<String> groupingColumns;
+ private final VirtualColumns virtualColumns;
+ @Nullable
+ private final List<AggregatorFactory> aggregators;
+
+ private final QueryContext queryContext;
+
+ private final boolean descending;
+ @Nullable
+ private final QueryMetrics<?> queryMetrics;
+
+ public CursorBuildSpec(
+ @Nullable Filter filter,
+ Interval interval,
+ Granularity granularity,
+ @Nullable List<String> groupingColumns,
+ VirtualColumns virtualColumns,
+ @Nullable List<AggregatorFactory> aggregators,
+ QueryContext queryContext,
+ boolean descending,
+ @Nullable QueryMetrics<?> queryMetrics
+ )
+ {
+ this.filter = filter;
+ this.interval = interval;
+ this.granularity = granularity;
+ this.groupingColumns = groupingColumns;
+ this.virtualColumns = virtualColumns;
+ this.aggregators = aggregators;
+ this.descending = descending;
+ this.queryContext = queryContext;
+ this.queryMetrics = queryMetrics;
+ }
+
+ @Nullable
+ public Filter getFilter()
+ {
+ return filter;
+ }
+
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ public Granularity getGranularity()
Review Comment:
Something is odd with the API design here. With the changes to cursor
sequences, I would think that granularity should no longer be a concept that
the cursors need to think about. It should be something that is dealt with at
the query engine level. In fact, `getGranularity()` doesn't appear to be used
by the `CursorMaker` implementations, except sometimes to throw exceptions
saying that granularities other than `ALL` aren't supported.
It may be that this is here to support future work with projections. If
that's the reason for having it, I would recommend we _don't_ use `granularity`
in determining what projections to use; better to use expressions or virtual
columns. Like, rather than encoding that a projection has `granularity: hour`,
we should encode that it has a dimension `timestamp_floor(__time, 'PT1H')`. The
`granularity` concept involves too much weirdness and it's best to have it
totally outside the cursor logic.
##########
processing/src/main/java/org/apache/druid/segment/Cursor.java:
##########
@@ -35,11 +36,45 @@
*/
public interface Cursor
{
+ /**
+ * Get a {@link ColumnSelectorFactory} whose selectors will be backed by the
row values at the current position of
+ * the cursor
+ */
ColumnSelectorFactory getColumnSelectorFactory();
- DateTime getTime();
+
+ /**
+ * Advance to the next row in the cursor, checking if thread has been
interrupted after advancing and possibly
+ * throwing {@link QueryInterruptedException} if so
+ */
void advance();
+
+ /**
+ * Advance to the next row in the cursor
+ */
void advanceUninterruptibly();
+
+ /**
+ * Check if there are any additional rows in the cursor
+ */
boolean isDone();
+
+ /**
+ * Check if there are any additional rows in the cursor, or if the thread
has been interrupted
+ */
boolean isDoneOrInterrupted();
+
+ /**
+ * Mark a position on the cursor which can recalled with {@link
#resetToMark()}
+ */
+ void mark(DateTime mark);
Review Comment:
This should include more info on the `mark` parameter. It appears to be used
by `IncrementalIndexCursor` and `RowBasedCursor`. But other popular
implementations (`FrameCursor`, `QueryableIndexCursor`) ignore it. Query
engines need to be written in such a way that they work properly on all cursor
types, so we should have some commentary on how to properly use this method,
and what behavioral guarantees apply.
##########
processing/src/main/java/org/apache/druid/segment/CursorMaker.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.segment.vector.VectorCursor;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+
+public interface CursorMaker extends Closeable
+{
+ /**
+ * Create a {@link Cursor} for use with non-vectorized query engines.
+ */
+ @Nullable
+ Cursor makeCursor();
+
+ /**
+ * Create a {@link VectorCursor} for use with vectorized query engines.
+ */
+ @Nullable
+ default VectorCursor makeVectorCursor()
+ {
+ throw new UOE("Cannot vectorize. Check 'canVectorize' before calling
'makeVectorCursor' on %s.", this.getClass().getName());
+ }
+
+ /**
+ * Returns true if this {@link CursorMaker} supports calling {@link
#makeVectorCursor()}.
+ */
+ default boolean canVectorize()
+ {
+ return false;
+ }
+
+ /**
+ * Release any resources acquired by cursors.
+ */
+ @Override
+ default void close()
+ {
+ // nothing to close
+ }
+
+ CursorMaker EMPTY = new CursorMaker()
+ {
+ @Override
+ public boolean canVectorize()
+ {
+ return true;
+ }
+
+ @Override
+ public Cursor makeCursor()
Review Comment:
`@Nullable`
##########
processing/src/main/java/org/apache/druid/query/CursorGranularizer.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+
+public class CursorGranularizer
+{
+ @Nullable
+ public static CursorGranularizer create(
+ final StorageAdapter storageAdapter,
+ final Cursor cursor,
+ final Granularity granularity,
+ final Interval queryInterval,
+ final boolean descending
+ )
+ {
+ final DateTime minTime = storageAdapter.getMinTime();
+ final DateTime maxTime = storageAdapter.getMaxTime();
+
+ final Interval storageAdapterInterval = new Interval(minTime,
granularity.bucketEnd(maxTime));
+ final Interval clippedQueryInterval =
queryInterval.overlap(storageAdapterInterval);
+
+ if (clippedQueryInterval == null) {
+ return null;
+ }
+
+ Iterable<Interval> bucketIterable =
granularity.getIterable(clippedQueryInterval);
+ if (descending) {
+ bucketIterable = Lists.reverse(ImmutableList.copyOf(bucketIterable));
+ }
+ final Interval firstBucket =
granularity.bucket(clippedQueryInterval.getStart());
+
+ final ColumnValueSelector timeSelector;
+ if (firstBucket.contains(clippedQueryInterval)) {
+ // Only one bucket, no need to read the time column.
+ assert Iterables.size(bucketIterable) == 1;
+ timeSelector = null;
+ } else {
+ // Multiple buckets, need to read the time column to know when we move
from one to the next.
+ timeSelector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+ }
+
+ return new CursorGranularizer(cursor, bucketIterable, timeSelector,
descending);
+ }
+
+ // And a cursor that has been made from it.
Review Comment:
This comment doesn't make sense.
##########
processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.filter.Filter;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class CursorBuildSpec
+{
+ public static final CursorBuildSpec FULL_SCAN =
CursorBuildSpec.builder().setGranularity(Granularities.ALL).build();
+
+ public static CursorBuildSpecBuilder builder()
+ {
+ return new CursorBuildSpecBuilder();
+ }
+
+ public static CursorBuildSpecBuilder builder(CursorBuildSpec spec)
+ {
+ return new CursorBuildSpecBuilder(spec);
+ }
+
+ @Nullable
+ private final Filter filter;
+ private final Interval interval;
+ private final Granularity granularity;
+ @Nullable
+ private final List<String> groupingColumns;
+ private final VirtualColumns virtualColumns;
+ @Nullable
+ private final List<AggregatorFactory> aggregators;
+
+ private final QueryContext queryContext;
+
+ private final boolean descending;
+ @Nullable
+ private final QueryMetrics<?> queryMetrics;
+
+ public CursorBuildSpec(
+ @Nullable Filter filter,
+ Interval interval,
+ Granularity granularity,
+ @Nullable List<String> groupingColumns,
+ VirtualColumns virtualColumns,
+ @Nullable List<AggregatorFactory> aggregators,
+ QueryContext queryContext,
+ boolean descending,
+ @Nullable QueryMetrics<?> queryMetrics
+ )
+ {
+ this.filter = filter;
+ this.interval = interval;
+ this.granularity = granularity;
+ this.groupingColumns = groupingColumns;
+ this.virtualColumns = virtualColumns;
+ this.aggregators = aggregators;
+ this.descending = descending;
+ this.queryContext = queryContext;
+ this.queryMetrics = queryMetrics;
+ }
+
+ @Nullable
+ public Filter getFilter()
+ {
+ return filter;
+ }
+
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ public Granularity getGranularity()
+ {
+ return granularity;
+ }
+
+ @Nullable
+ public List<String> getGroupingColumns()
+ {
+ return groupingColumns;
+ }
+
+ public VirtualColumns getVirtualColumns()
+ {
+ return virtualColumns;
+ }
+
+ @Nullable
+ public List<AggregatorFactory> getAggregators()
+ {
+ return aggregators;
+ }
+
+ public boolean isDescending()
+ {
+ return descending;
+ }
+
+ public QueryContext getQueryContext()
+ {
+ return queryContext;
+ }
+
+ @Nullable
+ public QueryMetrics<?> getQueryMetrics()
+ {
+ return queryMetrics;
+ }
+
+ public static class CursorBuildSpecBuilder
+ {
+ @Nullable
+ private Filter filter;
+ private Interval interval = Intervals.ETERNITY;
+ private Granularity granularity = Granularities.NONE;
+
+ @Nullable
+ private List<String> groupingColumns = null;
Review Comment:
nit: no reason to have `= null`, fields are initialized to `null` anyway.
(like `filter`)
##########
processing/src/main/java/org/apache/druid/segment/Cursor.java:
##########
@@ -35,11 +36,45 @@
*/
public interface Cursor
{
+ /**
+ * Get a {@link ColumnSelectorFactory} whose selectors will be backed by the
row values at the current position of
+ * the cursor
+ */
ColumnSelectorFactory getColumnSelectorFactory();
- DateTime getTime();
+
+ /**
+ * Advance to the next row in the cursor, checking if thread has been
interrupted after advancing and possibly
+ * throwing {@link QueryInterruptedException} if so
+ */
void advance();
+
+ /**
+ * Advance to the next row in the cursor
+ */
void advanceUninterruptibly();
+
+ /**
+ * Check if there are any additional rows in the cursor
Review Comment:
IMO the meaning of `isDone` is a little different than this text suggests.
The text makes it sound like `Iterator#hasNext`, but the behavior is subtly
different. The `isDone` method is not checking if there are additional rows
after the current cursor position; it checks if the current cursor position is
valid. That is, expected usage is:
```
while (!cursor.isDone()) {
// do stuff
cursor.advance();
}
```
Rather than the typical iterator pattern:
```
while (iter.hasNext()) {
T obj = iter.next();
// do stuff
}
```
##########
processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorMaker.java:
##########
@@ -483,19 +471,13 @@ public ColumnSelectorFactory getColumnSelectorFactory()
return columnSelectorFactory;
}
- @Override
- public DateTime getTime()
- {
- return bucketStart;
- }
-
@Override
public void advance()
{
cursorOffset.increment();
// Must call BaseQuery.checkInterrupted() after
cursorOffset.increment(), not before, because
// FilteredOffset.increment() is a potentially long, not an "instant"
operation (unlike to all other subclasses
- // of Offset) and it returns early on interruption, leaving itself in an
illegal state. We should not let
+ // of Offset) and it returns early on interruption, leaving itself in an
illegal We should not let
Review Comment:
the `state.` seems to have been removed erroneously
##########
processing/src/main/java/org/apache/druid/query/vector/VectorCursorGranularizer.java:
##########
@@ -102,6 +71,37 @@ public static VectorCursorGranularizer create(
return new VectorCursorGranularizer(cursor, bucketIterable, timeSelector);
}
+ // And a cursor that has been made from it.
Review Comment:
This comment didn't make sense before, and it still doesn't make sense ;P
##########
processing/src/main/java/org/apache/druid/query/Query.java:
##########
@@ -271,4 +275,24 @@ default Set<String> getRequiredColumns()
{
return null;
}
+
+ default CursorBuildSpec asCursorBuildSpec(@Nullable QueryMetrics<?>
queryMetrics)
+ {
+ final List<Interval> intervals = getIntervals();
+ if (intervals.size() > 1) {
Review Comment:
should be `!= 1`, due to the `Iterables.getOnlyElement` later.
BTW, the error message and only-element retrieval can be combined, if you
like, using `CollectionUtils.getOnlyElement` (which also takes an error
supplier).
##########
processing/src/main/java/org/apache/druid/query/CursorGranularizer.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+
+public class CursorGranularizer
Review Comment:
This class should have some javadocs explaining its purpose and should have
a `@see` for `VectorCursorGranularizer`. Also the javadocs for
`VectorCursorGranularizer` need to be updated; they still say that
"Nonvectorized engines have it handled for them by the StorageAdapter".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]