gianm commented on code in PR #16533:
URL: https://github.com/apache/druid/pull/16533#discussion_r1702047874


##########
extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java:
##########
@@ -54,7 +57,8 @@ public void setUp()
   @Test
   public void testRead()
   {
-    try (final VectorCursor cursor = makeCursor()) {
+    try (final CursorMaker maker = makeCursor()) {

Review Comment:
   `makeCursorMaker()`?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java:
##########
@@ -294,15 +289,22 @@ protected ReturnOrAwait<Unit> runWithInputChannel(
         final Frame frame = inputChannel.read();
         final FrameSegment frameSegment = new FrameSegment(frame, 
inputFrameReader, SegmentId.dummy("scan"));
 
-        final long rowsFlushed = setNextCursor(
-            Iterables.getOnlyElement(
-                makeCursors(
-                    query.withQuerySegmentSpec(new 
MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)),
-                    mapSegment(frameSegment).asStorageAdapter()
-                ).toList()
-            ),
-            frameSegment
-        );
+        final StorageAdapter adapter = 
mapSegment(frameSegment).asStorageAdapter();
+        if (adapter == null) {
+          throw new ISE(
+              "Null storage adapter found. Probably trying to issue a query 
against a segment being memory unmapped."
+          );
+        }
+
+        final CursorMaker maker = 
closer.register(adapter.asCursorMaker(query.asCursorBuildSpec(null)));
+        final Cursor nextCursor = maker.makeCursor();
+
+        if (nextCursor == null) {
+          // no cursor
+          maker.close();

Review Comment:
   `maker` is already registered with the `closer`, so I think there's no need 
to do `maker.close()` here. The old code hadn't registered the `cursorYielder` 
at this point.



##########
processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.filter.Filter;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class CursorBuildSpec
+{
+  public static final CursorBuildSpec FULL_SCAN = 
CursorBuildSpec.builder().setGranularity(Granularities.ALL).build();
+
+  public static CursorBuildSpecBuilder builder()
+  {
+    return new CursorBuildSpecBuilder();
+  }
+
+  public static CursorBuildSpecBuilder builder(CursorBuildSpec spec)
+  {
+    return new CursorBuildSpecBuilder(spec);
+  }
+
+  @Nullable
+  private final Filter filter;
+  private final Interval interval;
+  private final Granularity granularity;
+  @Nullable
+  private final List<String> groupingColumns;
+  private final VirtualColumns virtualColumns;
+  @Nullable
+  private final List<AggregatorFactory> aggregators;
+
+  private final QueryContext queryContext;
+
+  private final boolean descending;
+  @Nullable
+  private final QueryMetrics<?> queryMetrics;
+
+  public CursorBuildSpec(
+      @Nullable Filter filter,
+      Interval interval,
+      Granularity granularity,
+      @Nullable List<String> groupingColumns,
+      VirtualColumns virtualColumns,
+      @Nullable List<AggregatorFactory> aggregators,
+      QueryContext queryContext,
+      boolean descending,
+      @Nullable QueryMetrics<?> queryMetrics
+  )
+  {
+    this.filter = filter;
+    this.interval = interval;
+    this.granularity = granularity;
+    this.groupingColumns = groupingColumns;
+    this.virtualColumns = virtualColumns;
+    this.aggregators = aggregators;
+    this.descending = descending;
+    this.queryContext = queryContext;
+    this.queryMetrics = queryMetrics;
+  }
+
+  @Nullable
+  public Filter getFilter()
+  {
+    return filter;
+  }
+
+  public Interval getInterval()
+  {
+    return interval;
+  }
+
+  public Granularity getGranularity()
+  {
+    return granularity;
+  }
+
+  @Nullable
+  public List<String> getGroupingColumns()
+  {
+    return groupingColumns;
+  }
+
+  public VirtualColumns getVirtualColumns()
+  {
+    return virtualColumns;
+  }
+
+  @Nullable
+  public List<AggregatorFactory> getAggregators()
+  {
+    return aggregators;
+  }
+
+  public boolean isDescending()

Review Comment:
   Since we're changing the interfaces, better to drop `descending` and replace 
it with `sortOrder`. That's useful for projections since different projections 
might have different sort orders.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java:
##########
@@ -252,21 +245,23 @@ protected ReturnOrAwait<Unit> runWithSegment(final 
SegmentWithDescriptor segment
     if (cursor == null) {
       final ResourceHolder<Segment> segmentHolder = 
closer.register(segment.getOrLoad());
 
-      final Yielder<Cursor> cursorYielder = Yielders.each(
-          makeCursors(
-              query.withQuerySegmentSpec(new 
SpecificSegmentSpec(segment.getDescriptor())),
-              mapSegment(segmentHolder.get()).asStorageAdapter()
-          )
-      );
+      final StorageAdapter adapter = 
mapSegment(segmentHolder.get()).asStorageAdapter();
+      if (adapter == null) {
+        throw new ISE(
+            "Null storage adapter found. Probably trying to issue a query 
against a segment being memory unmapped."
+        );
+      }
 
-      if (cursorYielder.isDone()) {
+      final CursorMaker maker = 
closer.register(adapter.asCursorMaker(query.asCursorBuildSpec(null)));
+      final Cursor nextCursor = maker.makeCursor();
+
+      if (nextCursor == null) {
         // No cursors!
-        cursorYielder.close();
+        maker.close();

Review Comment:
   `maker` is already registered with the `closer`, so I think there's no need 
to do `maker.close()` here. The old code hadn't registered the `cursorYielder` 
at this point.



##########
processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.filter.Filter;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class CursorBuildSpec
+{
+  public static final CursorBuildSpec FULL_SCAN = 
CursorBuildSpec.builder().setGranularity(Granularities.ALL).build();
+
+  public static CursorBuildSpecBuilder builder()
+  {
+    return new CursorBuildSpecBuilder();
+  }
+
+  public static CursorBuildSpecBuilder builder(CursorBuildSpec spec)
+  {
+    return new CursorBuildSpecBuilder(spec);
+  }
+
+  @Nullable
+  private final Filter filter;
+  private final Interval interval;
+  private final Granularity granularity;
+  @Nullable
+  private final List<String> groupingColumns;
+  private final VirtualColumns virtualColumns;
+  @Nullable
+  private final List<AggregatorFactory> aggregators;
+
+  private final QueryContext queryContext;
+
+  private final boolean descending;
+  @Nullable
+  private final QueryMetrics<?> queryMetrics;
+
+  public CursorBuildSpec(
+      @Nullable Filter filter,
+      Interval interval,
+      Granularity granularity,
+      @Nullable List<String> groupingColumns,
+      VirtualColumns virtualColumns,
+      @Nullable List<AggregatorFactory> aggregators,
+      QueryContext queryContext,
+      boolean descending,
+      @Nullable QueryMetrics<?> queryMetrics
+  )
+  {
+    this.filter = filter;
+    this.interval = interval;
+    this.granularity = granularity;
+    this.groupingColumns = groupingColumns;
+    this.virtualColumns = virtualColumns;
+    this.aggregators = aggregators;
+    this.descending = descending;
+    this.queryContext = queryContext;
+    this.queryMetrics = queryMetrics;
+  }
+
+  @Nullable
+  public Filter getFilter()
+  {
+    return filter;
+  }
+
+  public Interval getInterval()
+  {
+    return interval;
+  }
+
+  public Granularity getGranularity()
+  {
+    return granularity;
+  }
+
+  @Nullable
+  public List<String> getGroupingColumns()
+  {
+    return groupingColumns;
+  }
+
+  public VirtualColumns getVirtualColumns()
+  {
+    return virtualColumns;
+  }
+
+  @Nullable
+  public List<AggregatorFactory> getAggregators()
+  {
+    return aggregators;
+  }
+
+  public boolean isDescending()
+  {
+    return descending;
+  }
+
+  public QueryContext getQueryContext()
+  {
+    return queryContext;
+  }
+
+  @Nullable
+  public QueryMetrics<?> getQueryMetrics()
+  {
+    return queryMetrics;
+  }
+
+  public static class CursorBuildSpecBuilder
+  {
+    @Nullable
+    private Filter filter;
+    private Interval interval = Intervals.ETERNITY;
+    private Granularity granularity = Granularities.NONE;
+
+    @Nullable
+    private List<String> groupingColumns = null;
+    private VirtualColumns virtualColumns = VirtualColumns.EMPTY;
+    @Nullable
+    private List<AggregatorFactory> aggregators = null;
+    private boolean descending = false;
+
+    private QueryContext queryContext = QueryContext.empty();
+    @Nullable
+    private QueryMetrics<?> queryMetrics;
+
+    private CursorBuildSpecBuilder()
+    {
+      //

Review Comment:
   `//` what? Possibly should be `// Do nothing.`



##########
processing/src/main/java/org/apache/druid/query/FrameBasedInlineDataSource.java:
##########
@@ -81,18 +82,22 @@ public RowSignature getRowSignature()
 
   public Sequence<Object[]> getRowsAsSequence()
   {
-
+    final Closer closer = Closer.create();
     final Sequence<Cursor> cursorSequence =
         Sequences.simple(frames)
-                 .flatMap(
+                 .map(

Review Comment:
   IMO, it's better closeable management to use a `flatMap` with single-cursor 
sequences that individually do `withBaggage(maker)`. That way the makers get 
closed as soon as we're done with them, instead of all at the end.
   
   It doesn't really matter right now for frames, but I'm thinking in case it 
does matter in the future.



##########
processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorMaker.java:
##########
@@ -668,4 +724,43 @@
       return new DescendingTimestampCheckingOffset(baseOffset.clone(), 
timestamps, timeLimit, allWithinThreshold);
     }
   }
+
+  private final class CursorResources implements Closeable

Review Comment:
   seems like a good point ☝️ 



##########
processing/src/main/java/org/apache/druid/segment/Cursor.java:
##########
@@ -35,11 +36,45 @@
  */
 public interface Cursor
 {
+  /**
+   * Get a {@link ColumnSelectorFactory} whose selectors will be backed by the 
row values at the current position of
+   * the cursor
+   */
   ColumnSelectorFactory getColumnSelectorFactory();
-  DateTime getTime();
+
+  /**
+   * Advance to the next row in the cursor, checking if thread has been 
interrupted after advancing and possibly
+   * throwing {@link QueryInterruptedException} if so
+   */
   void advance();
+
+  /**
+   * Advance to the next row in the cursor
+   */
   void advanceUninterruptibly();
+
+  /**
+   * Check if there are any additional rows in the cursor
+   */
   boolean isDone();
+
+  /**
+   * Check if there are any additional rows in the cursor, or if the thread 
has been interrupted
+   */
   boolean isDoneOrInterrupted();
+
+  /**
+   * Mark a position on the cursor which can recalled with {@link 
#resetToMark()}
+   */
+  void mark(DateTime mark);
+
+  /**
+   * Reset to position set by {@link #mark(DateTime)}
+   */
+  void resetToMark();
+
+  /**
+   * Reset to start of cursor

Review Comment:
   Should specify that this also clears the mark.



##########
processing/src/main/java/org/apache/druid/segment/CursorMaker.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.segment.vector.VectorCursor;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+
+public interface CursorMaker extends Closeable

Review Comment:
   I wonder if `CursorHolder` is a better name than `CursorMaker`? Using 
"holder" rather than "maker" better emphasizes the fact that the `CursorMaker` 
(or `CursorHolder`) is closeable— it suggests it's a container that wraps a 
`Cursor` or `VectorCursor` + the resources used by the cursor. It also avoids 
the awkward name `CursorMakerFactory`.
   
   If we did that, I think we'd also change:
   
   - `makeCursor` -> `asCursor()` or `getCursor()`
   - `makeVectorCursor` -> `asVectorCursor()` or `getVectorCursor()`
   - `CursorMakerFactory` -> `CursorHolderFactory` or even stick with 
`CursorFactory`
   - `CursorMakerFactory#asCursorMaker` -> `#makeCursorHolder`
   - possibly update `CursorMaker` (`CursorHolder`) implementations to stash 
the cursor in a local field, so `asCursor()` or `getCursor()` could be called 
multiple times and return the same cursor. This would enable query engines to 
pass around the `CursorHolder` as a unified `Closeable` + holder for the actual 
cursor, which would potentially simplify the logic around tracking that things 
are closed properly.



##########
processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.filter.Filter;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class CursorBuildSpec
+{
+  public static final CursorBuildSpec FULL_SCAN = 
CursorBuildSpec.builder().setGranularity(Granularities.ALL).build();
+
+  public static CursorBuildSpecBuilder builder()
+  {
+    return new CursorBuildSpecBuilder();
+  }
+
+  public static CursorBuildSpecBuilder builder(CursorBuildSpec spec)
+  {
+    return new CursorBuildSpecBuilder(spec);
+  }
+
+  @Nullable
+  private final Filter filter;
+  private final Interval interval;
+  private final Granularity granularity;
+  @Nullable
+  private final List<String> groupingColumns;
+  private final VirtualColumns virtualColumns;
+  @Nullable
+  private final List<AggregatorFactory> aggregators;
+
+  private final QueryContext queryContext;
+
+  private final boolean descending;
+  @Nullable
+  private final QueryMetrics<?> queryMetrics;
+
+  public CursorBuildSpec(
+      @Nullable Filter filter,
+      Interval interval,
+      Granularity granularity,
+      @Nullable List<String> groupingColumns,
+      VirtualColumns virtualColumns,
+      @Nullable List<AggregatorFactory> aggregators,
+      QueryContext queryContext,
+      boolean descending,
+      @Nullable QueryMetrics<?> queryMetrics
+  )
+  {
+    this.filter = filter;
+    this.interval = interval;
+    this.granularity = granularity;
+    this.groupingColumns = groupingColumns;
+    this.virtualColumns = virtualColumns;
+    this.aggregators = aggregators;
+    this.descending = descending;
+    this.queryContext = queryContext;
+    this.queryMetrics = queryMetrics;
+  }
+
+  @Nullable
+  public Filter getFilter()
+  {
+    return filter;
+  }
+
+  public Interval getInterval()
+  {
+    return interval;
+  }
+
+  public Granularity getGranularity()

Review Comment:
   Something is odd with the API design here. With the changes to cursor 
sequences, I would think that granularity should no longer be a concept that 
the cursors need to think about. It should be something that is dealt with at 
the query engine level. In fact, `getGranularity()` doesn't appear to be used 
by the `CursorMaker` implementations, except sometimes to throw exceptions 
saying that granularities other than `ALL` aren't supported.
   
   It may be that this is here to support future work with projections. If 
that's the reason for having it, I would recommend we _don't_ use `granularity` 
in determining what projections to use; better to use expressions or virtual 
columns. Like, rather than encoding that a projection has `granularity: hour`, 
we should encode that it has a dimension `timestamp_floor(__time, 'PT1H')`. The 
`granularity` concept involves too much weirdness and it's best to have it 
totally outside the cursor logic.



##########
processing/src/main/java/org/apache/druid/segment/Cursor.java:
##########
@@ -35,11 +36,45 @@
  */
 public interface Cursor
 {
+  /**
+   * Get a {@link ColumnSelectorFactory} whose selectors will be backed by the 
row values at the current position of
+   * the cursor
+   */
   ColumnSelectorFactory getColumnSelectorFactory();
-  DateTime getTime();
+
+  /**
+   * Advance to the next row in the cursor, checking if thread has been 
interrupted after advancing and possibly
+   * throwing {@link QueryInterruptedException} if so
+   */
   void advance();
+
+  /**
+   * Advance to the next row in the cursor
+   */
   void advanceUninterruptibly();
+
+  /**
+   * Check if there are any additional rows in the cursor
+   */
   boolean isDone();
+
+  /**
+   * Check if there are any additional rows in the cursor, or if the thread 
has been interrupted
+   */
   boolean isDoneOrInterrupted();
+
+  /**
+   * Mark a position on the cursor which can recalled with {@link 
#resetToMark()}
+   */
+  void mark(DateTime mark);

Review Comment:
   This should include more info on the `mark` parameter. It appears to be used 
by `IncrementalIndexCursor` and `RowBasedCursor`. But other popular 
implementations (`FrameCursor`, `QueryableIndexCursor`) ignore it. Query 
engines need to be written in such a way that they work properly on all cursor 
types, so we should have some commentary on how to properly use this method, 
and what behavioral guarantees apply.



##########
processing/src/main/java/org/apache/druid/segment/CursorMaker.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.segment.vector.VectorCursor;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+
+public interface CursorMaker extends Closeable
+{
+  /**
+   * Create a {@link Cursor} for use with non-vectorized query engines.
+   */
+  @Nullable
+  Cursor makeCursor();
+
+  /**
+   * Create a {@link VectorCursor} for use with vectorized query engines.
+   */
+  @Nullable
+  default VectorCursor makeVectorCursor()
+  {
+    throw new UOE("Cannot vectorize. Check 'canVectorize' before calling 
'makeVectorCursor' on %s.", this.getClass().getName());
+  }
+
+  /**
+   * Returns true if this {@link CursorMaker} supports calling {@link 
#makeVectorCursor()}.
+   */
+  default boolean canVectorize()
+  {
+    return false;
+  }
+
+  /**
+   * Release any resources acquired by cursors.
+   */
+  @Override
+  default void close()
+  {
+    // nothing to close
+  }
+
+  CursorMaker EMPTY = new CursorMaker()
+  {
+    @Override
+    public boolean canVectorize()
+    {
+      return true;
+    }
+
+    @Override
+    public Cursor makeCursor()

Review Comment:
   `@Nullable`



##########
processing/src/main/java/org/apache/druid/query/CursorGranularizer.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+
+public class CursorGranularizer
+{
+  @Nullable
+  public static CursorGranularizer create(
+      final StorageAdapter storageAdapter,
+      final Cursor cursor,
+      final Granularity granularity,
+      final Interval queryInterval,
+      final boolean descending
+  )
+  {
+    final DateTime minTime = storageAdapter.getMinTime();
+    final DateTime maxTime = storageAdapter.getMaxTime();
+
+    final Interval storageAdapterInterval = new Interval(minTime, 
granularity.bucketEnd(maxTime));
+    final Interval clippedQueryInterval = 
queryInterval.overlap(storageAdapterInterval);
+
+    if (clippedQueryInterval == null) {
+      return null;
+    }
+
+    Iterable<Interval> bucketIterable = 
granularity.getIterable(clippedQueryInterval);
+    if (descending) {
+      bucketIterable = Lists.reverse(ImmutableList.copyOf(bucketIterable));
+    }
+    final Interval firstBucket = 
granularity.bucket(clippedQueryInterval.getStart());
+
+    final ColumnValueSelector timeSelector;
+    if (firstBucket.contains(clippedQueryInterval)) {
+      // Only one bucket, no need to read the time column.
+      assert Iterables.size(bucketIterable) == 1;
+      timeSelector = null;
+    } else {
+      // Multiple buckets, need to read the time column to know when we move 
from one to the next.
+      timeSelector = 
cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+    }
+
+    return new CursorGranularizer(cursor, bucketIterable, timeSelector, 
descending);
+  }
+
+  // And a cursor that has been made from it.

Review Comment:
   This comment doesn't make sense.



##########
processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.filter.Filter;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class CursorBuildSpec
+{
+  public static final CursorBuildSpec FULL_SCAN = 
CursorBuildSpec.builder().setGranularity(Granularities.ALL).build();
+
+  public static CursorBuildSpecBuilder builder()
+  {
+    return new CursorBuildSpecBuilder();
+  }
+
+  public static CursorBuildSpecBuilder builder(CursorBuildSpec spec)
+  {
+    return new CursorBuildSpecBuilder(spec);
+  }
+
+  @Nullable
+  private final Filter filter;
+  private final Interval interval;
+  private final Granularity granularity;
+  @Nullable
+  private final List<String> groupingColumns;
+  private final VirtualColumns virtualColumns;
+  @Nullable
+  private final List<AggregatorFactory> aggregators;
+
+  private final QueryContext queryContext;
+
+  private final boolean descending;
+  @Nullable
+  private final QueryMetrics<?> queryMetrics;
+
+  public CursorBuildSpec(
+      @Nullable Filter filter,
+      Interval interval,
+      Granularity granularity,
+      @Nullable List<String> groupingColumns,
+      VirtualColumns virtualColumns,
+      @Nullable List<AggregatorFactory> aggregators,
+      QueryContext queryContext,
+      boolean descending,
+      @Nullable QueryMetrics<?> queryMetrics
+  )
+  {
+    this.filter = filter;
+    this.interval = interval;
+    this.granularity = granularity;
+    this.groupingColumns = groupingColumns;
+    this.virtualColumns = virtualColumns;
+    this.aggregators = aggregators;
+    this.descending = descending;
+    this.queryContext = queryContext;
+    this.queryMetrics = queryMetrics;
+  }
+
+  @Nullable
+  public Filter getFilter()
+  {
+    return filter;
+  }
+
+  public Interval getInterval()
+  {
+    return interval;
+  }
+
+  public Granularity getGranularity()
+  {
+    return granularity;
+  }
+
+  @Nullable
+  public List<String> getGroupingColumns()
+  {
+    return groupingColumns;
+  }
+
+  public VirtualColumns getVirtualColumns()
+  {
+    return virtualColumns;
+  }
+
+  @Nullable
+  public List<AggregatorFactory> getAggregators()
+  {
+    return aggregators;
+  }
+
+  public boolean isDescending()
+  {
+    return descending;
+  }
+
+  public QueryContext getQueryContext()
+  {
+    return queryContext;
+  }
+
+  @Nullable
+  public QueryMetrics<?> getQueryMetrics()
+  {
+    return queryMetrics;
+  }
+
+  public static class CursorBuildSpecBuilder
+  {
+    @Nullable
+    private Filter filter;
+    private Interval interval = Intervals.ETERNITY;
+    private Granularity granularity = Granularities.NONE;
+
+    @Nullable
+    private List<String> groupingColumns = null;

Review Comment:
   nit: no reason to have `= null`, fields are initialized to `null` anyway. 
(like `filter`)



##########
processing/src/main/java/org/apache/druid/segment/Cursor.java:
##########
@@ -35,11 +36,45 @@
  */
 public interface Cursor
 {
+  /**
+   * Get a {@link ColumnSelectorFactory} whose selectors will be backed by the 
row values at the current position of
+   * the cursor
+   */
   ColumnSelectorFactory getColumnSelectorFactory();
-  DateTime getTime();
+
+  /**
+   * Advance to the next row in the cursor, checking if thread has been 
interrupted after advancing and possibly
+   * throwing {@link QueryInterruptedException} if so
+   */
   void advance();
+
+  /**
+   * Advance to the next row in the cursor
+   */
   void advanceUninterruptibly();
+
+  /**
+   * Check if there are any additional rows in the cursor

Review Comment:
   IMO the meaning of `isDone` is a little different than this text suggests. 
The text makes it sound like `Iterator#hasNext`, but the behavior is subtly 
different. The `isDone` method is not checking if there are additional rows 
after the current cursor position; it checks if the current cursor position is 
valid. That is, expected usage is:
   
   ```
   while (!cursor.isDone()) {
     // do stuff
     cursor.advance();
   }
   ```
   
   Rather than the typical iterator pattern:
   
   ```
   while (iter.hasNext()) {
     T obj = iter.next();
     // do stuff
   }
   ```



##########
processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorMaker.java:
##########
@@ -483,19 +471,13 @@ public ColumnSelectorFactory getColumnSelectorFactory()
       return columnSelectorFactory;
     }
 
-    @Override
-    public DateTime getTime()
-    {
-      return bucketStart;
-    }
-
     @Override
     public void advance()
     {
       cursorOffset.increment();
       // Must call BaseQuery.checkInterrupted() after 
cursorOffset.increment(), not before, because
       // FilteredOffset.increment() is a potentially long, not an "instant" 
operation (unlike to all other subclasses
-      // of Offset) and it returns early on interruption, leaving itself in an 
illegal state. We should not let
+      // of Offset) and it returns early on interruption, leaving itself in an 
illegal  We should not let

Review Comment:
   the `state.` seems to have been removed erroneously



##########
processing/src/main/java/org/apache/druid/query/vector/VectorCursorGranularizer.java:
##########
@@ -102,6 +71,37 @@ public static VectorCursorGranularizer create(
     return new VectorCursorGranularizer(cursor, bucketIterable, timeSelector);
   }
 
+  // And a cursor that has been made from it.

Review Comment:
   This comment didn't make sense before, and it still doesn't make sense ;P



##########
processing/src/main/java/org/apache/druid/query/Query.java:
##########
@@ -271,4 +275,24 @@ default Set<String> getRequiredColumns()
   {
     return null;
   }
+
+  default CursorBuildSpec asCursorBuildSpec(@Nullable QueryMetrics<?> 
queryMetrics)
+  {
+    final List<Interval> intervals = getIntervals();
+    if (intervals.size() > 1) {

Review Comment:
   should be `!= 1`, due to the `Iterables.getOnlyElement` later.
   
   BTW, the error message and only-element retrieval can be combined, if you 
like, using `CollectionUtils.getOnlyElement` (which also takes an error 
supplier).



##########
processing/src/main/java/org/apache/druid/query/CursorGranularizer.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+
+public class CursorGranularizer

Review Comment:
   This class should have some javadocs explaining its purpose and should have 
a `@see` for `VectorCursorGranularizer`. Also the javadocs for 
`VectorCursorGranularizer` need to be updated; they still say that 
"Nonvectorized engines have it handled for them by the StorageAdapter".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to