This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new af1ad904fad fix time column seeking for time ordered projections
(#17711)
af1ad904fad is described below
commit af1ad904fad800d70ecf844493714d1bdce60f42
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Feb 11 18:31:06 2025 -0800
fix time column seeking for time ordered projections (#17711)
---
.../druid/segment/QueryableIndexCursorFactory.java | 87 ++++++++-------
.../druid/segment/QueryableIndexCursorHolder.java | 17 +--
.../segment/QueryableIndexCursorHolderTest.java | 122 +++++++++++++++++++++
3 files changed, 173 insertions(+), 53 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
index d3aa95685aa..f1aa2bb7bf5 100644
---
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
@@ -61,50 +61,53 @@ public class QueryableIndexCursorFactory implements
CursorFactory
public CursorHolder makeCursorHolder(CursorBuildSpec spec)
{
QueryableProjection<QueryableIndex> projection = index.getProjection(spec);
- if (projection != null) {
- return new QueryableIndexCursorHolder(
- projection.getRowSelector(),
- projection.getCursorBuildSpec(),
- timeBoundaryInspector
+ if (projection == null) {
+ // no projections, create regular cursor holder
+ return new QueryableIndexCursorHolder(index, spec,
timeBoundaryInspector);
+ }
+
+ // create projection cursor holder
+ return new QueryableIndexCursorHolder(
+ projection.getRowSelector(),
+ projection.getCursorBuildSpec(),
+ QueryableIndexTimeBoundaryInspector.create(projection.getRowSelector())
+ )
+ {
+ @Override
+ protected ColumnSelectorFactory makeColumnSelectorFactoryForOffset(
+ ColumnCache columnCache,
+ Offset baseOffset
)
{
- @Override
- protected ColumnSelectorFactory makeColumnSelectorFactoryForOffset(
- ColumnCache columnCache,
- Offset baseOffset
- )
- {
- return projection.wrapColumnSelectorFactory(
- super.makeColumnSelectorFactoryForOffset(columnCache, baseOffset)
- );
- }
-
- @Override
- protected VectorColumnSelectorFactory
makeVectorColumnSelectorFactoryForOffset(
- ColumnCache columnCache,
- VectorOffset baseOffset
- )
- {
- return projection.wrapVectorColumnSelectorFactory(
- super.makeVectorColumnSelectorFactoryForOffset(columnCache,
baseOffset)
- );
- }
-
- @Override
- public boolean isPreAggregated()
- {
- return true;
- }
-
- @Nullable
- @Override
- public List<AggregatorFactory> getAggregatorsForPreAggregated()
- {
- return projection.getCursorBuildSpec().getAggregators();
- }
- };
- }
- return new QueryableIndexCursorHolder(index,
CursorBuildSpec.builder(spec).build(), timeBoundaryInspector);
+ return projection.wrapColumnSelectorFactory(
+ super.makeColumnSelectorFactoryForOffset(columnCache, baseOffset)
+ );
+ }
+
+ @Override
+ protected VectorColumnSelectorFactory
makeVectorColumnSelectorFactoryForOffset(
+ ColumnCache columnCache,
+ VectorOffset baseOffset
+ )
+ {
+ return projection.wrapVectorColumnSelectorFactory(
+ super.makeVectorColumnSelectorFactoryForOffset(columnCache,
baseOffset)
+ );
+ }
+
+ @Override
+ public boolean isPreAggregated()
+ {
+ return true;
+ }
+
+ @Nullable
+ @Override
+ public List<AggregatorFactory> getAggregatorsForPreAggregated()
+ {
+ return projection.getCursorBuildSpec().getAggregators();
+ }
+ };
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorHolder.java
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorHolder.java
index b1f0022c39f..b2db4125d8d 100644
---
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorHolder.java
+++
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorHolder.java
@@ -178,6 +178,7 @@ public class QueryableIndexCursorHolder implements
CursorHolder
final long timeStart = Math.max(interval.getStartMillis(),
minDataTimestamp);
final long timeEnd = interval.getEndMillis();
+ final Offset offset;
if (timeOrder == Order.ASCENDING) {
for (; baseOffset.withinBounds(); baseOffset.increment()) {
@@ -185,17 +186,6 @@ public class QueryableIndexCursorHolder implements
CursorHolder
break;
}
}
- } else if (timeOrder == Order.DESCENDING) {
- for (; baseOffset.withinBounds(); baseOffset.increment()) {
- if
(resources.getTimestampsColumn().getLongSingleValueRow(baseOffset.getOffset())
< timeEnd) {
- break;
- }
- }
- }
-
- final Offset offset;
-
- if (timeOrder == Order.ASCENDING) {
offset = new AscendingTimestampCheckingOffset(
baseOffset,
resources.getTimestampsColumn(),
@@ -203,6 +193,11 @@ public class QueryableIndexCursorHolder implements
CursorHolder
maxDataTimestamp < timeEnd
);
} else if (timeOrder == Order.DESCENDING) {
+ for (; baseOffset.withinBounds(); baseOffset.increment()) {
+ if
(resources.getTimestampsColumn().getLongSingleValueRow(baseOffset.getOffset())
< timeEnd) {
+ break;
+ }
+ }
offset = new DescendingTimestampCheckingOffset(
baseOffset,
resources.getTimestampsColumn(),
diff --git
a/processing/src/test/java/org/apache/druid/segment/QueryableIndexCursorHolderTest.java
b/processing/src/test/java/org/apache/druid/segment/QueryableIndexCursorHolderTest.java
index 349236a9dcf..dd1f0c6c27f 100644
---
a/processing/src/test/java/org/apache/druid/segment/QueryableIndexCursorHolderTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/QueryableIndexCursorHolderTest.java
@@ -19,12 +19,39 @@
package org.apache.druid.segment;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.ListBasedInputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.column.NumericColumn;
import org.apache.druid.segment.data.ReadableOffset;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.utils.CloseableUtils;
+import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
public class QueryableIndexCursorHolderTest
{
@Test
@@ -114,4 +141,99 @@ public class QueryableIndexCursorHolderTest
QueryableIndexCursorHolder.timeSearch(column, 15, 0, values.length)
);
}
+
+ @Test
+ public void testProjectionTimeBoundaryInspector()
+ {
+ final DateTime startTime = DateTimes.nowUtc();
+ final DimensionsSpec dims = DimensionsSpec.builder()
+ .setDimensions(
+ Arrays.asList(
+ new
StringDimensionSchema("a"),
+ new
StringDimensionSchema("b"),
+ new
LongDimensionSchema("c"),
+ new
DoubleDimensionSchema("d")
+ )
+ )
+ .build();
+ File tmp = FileUtils.createTempDir();
+ final Closer closer = Closer.create();
+ closer.register(tmp::delete);
+ final List<InputRow> rows = Arrays.asList(
+ new ListBasedInputRow(
+ CursorFactoryProjectionTest.ROW_SIGNATURE,
+ startTime,
+ CursorFactoryProjectionTest.ROW_SIGNATURE.getColumnNames(),
+ Arrays.asList("a", "aa", 1L, 1.0)
+ ),
+ new ListBasedInputRow(
+ CursorFactoryProjectionTest.ROW_SIGNATURE,
+ startTime.plusMinutes(2),
+ CursorFactoryProjectionTest.ROW_SIGNATURE.getColumnNames(),
+ Arrays.asList("a", "bb", 1L, 1.1, 1.1f)
+ )
+ );
+ IndexBuilder bob = IndexBuilder.create()
+ .tmpDir(tmp)
+ .schema(
+ IncrementalIndexSchema.builder()
+
.withDimensionsSpec(dims)
+ .withRollup(false)
+
.withMinTimestamp(startTime.getMillis())
+ .withProjections(
+
Collections.singletonList(
+ new
AggregateProjectionSpec(
+
"ab_hourly_cd_sum_time_ordered",
+
VirtualColumns.create(
+
Granularities.toVirtualColumn(
+
Granularities.HOUR,
+
"__gran"
+ )
+ ),
+
Arrays.asList(
+
new LongDimensionSchema("__gran"),
+
new StringDimensionSchema("a"),
+
new StringDimensionSchema("b")
+ ),
+ new
AggregatorFactory[]{
+
new LongSumAggregatorFactory(
+
"_c_sum",
+
"c"
+ ),
+
new DoubleSumAggregatorFactory("d", "d")
+ }
+ )
+ )
+ )
+ .build()
+ )
+ .rows(rows);
+
+ try (QueryableIndex index = bob.buildMMappedIndex()) {
+ CursorBuildSpec buildSpec = CursorBuildSpec.builder()
+
.setGroupingColumns(ImmutableList.of("a", "b"))
+
.setPhysicalColumns(ImmutableSet.of("a", "b"))
+ .setAggregators(
+ ImmutableList.of(
+ new
LongSumAggregatorFactory("c_sum", "c")
+ )
+ )
+
.setQueryContext(QueryContext.of(ImmutableMap.of(QueryContexts.FORCE_PROJECTION,
true)))
+ .build();
+ final CursorFactory cursorFactory = new
QueryableIndexCursorFactory(index);
+
+ try (final CursorHolder cursorHolder =
cursorFactory.makeCursorHolder(buildSpec)) {
+ final Cursor cursor = cursorHolder.asCursor();
+ int rowCount = 0;
+ while (!cursor.isDone()) {
+ rowCount++;
+ cursor.advance();
+ }
+ Assert.assertEquals(2, rowCount);
+ }
+ }
+ finally {
+ CloseableUtils.closeAndWrapExceptions(closer);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]