clintropolis commented on code in PR #16849:
URL: https://github.com/apache/druid/pull/16849#discussion_r1706220824
##########
processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java:
##########
@@ -918,4 +940,84 @@ public void clear()
facts.clear();
}
}
+
+ static final class PlainNonTimeOrderedFactsHolder implements FactsHolder
+ {
+ private final Deque<IncrementalIndexRow> facts;
+ private final Comparator<IncrementalIndexRow>
incrementalIndexRowComparator;
+ private volatile long minTime = DateTimes.MAX.getMillis();
+ private volatile long maxTime = DateTimes.MIN.getMillis();
+
+ public PlainNonTimeOrderedFactsHolder(Comparator<IncrementalIndexRow>
incrementalIndexRowComparator)
+ {
+ this.facts = new ArrayDeque<>();
+ this.incrementalIndexRowComparator = incrementalIndexRowComparator;
+ }
+
+ @Override
+ public int getPriorIndex(IncrementalIndexRow key)
+ {
+ // always return EMPTY_ROW_INDEX to indicate that no prior key cause we
always add new row
+ return IncrementalIndexRow.EMPTY_ROW_INDEX;
+ }
+
+ @Override
+ public long getMinTimeMillis()
+ {
+ return minTime;
+ }
+
+ @Override
+ public long getMaxTimeMillis()
+ {
+ return maxTime;
+ }
+
+ @Override
+ public Iterator<IncrementalIndexRow> iterator(boolean descending)
Review Comment:
descending is pretty tightly coupled with time ordering, but i guess is
harmless to implement
##########
processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java:
##########
@@ -777,11 +790,18 @@ public Iterator<IncrementalIndexRow> iterator(boolean
descending)
@Override
public Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending,
long timeStart, long timeEnd)
{
- IncrementalIndexRow start = new IncrementalIndexRow(timeStart, new
Object[]{}, dimensionDescsList);
- IncrementalIndexRow end = new IncrementalIndexRow(timeEnd, new
Object[]{}, dimensionDescsList);
- ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow> subMap
= facts.subMap(start, end);
- ConcurrentMap<IncrementalIndexRow, IncrementalIndexRow> rangeMap =
descending ? subMap.descendingMap() : subMap;
- return rangeMap.keySet();
+ if (timeOrdered) {
+ IncrementalIndexRow start = new IncrementalIndexRow(timeStart, new
Object[]{}, dimensionDescsList);
+ IncrementalIndexRow end = new IncrementalIndexRow(timeEnd, new
Object[]{}, dimensionDescsList);
+ ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow>
subMap = facts.subMap(start, end);
+ ConcurrentMap<IncrementalIndexRow, IncrementalIndexRow> rangeMap =
descending ? subMap.descendingMap() : subMap;
+ return rangeMap.keySet();
+ } else {
+ return Iterables.filter(
Review Comment:
I know we need this right now, but i'm not sure we will need this after
#16533. timeRangeIterable is primarily used to support query granularity
buckets in topN (in my branch to support mark/resetToMark to move the cursor in
the facts table to the correct granularity bucket without having to advance the
cursor directly). i think we could just use `iterator` or expose an alternative
iteralble for incremental index cursor if we aren't requesting time ordering
##########
processing/src/main/java/org/apache/druid/segment/Metadata.java:
##########
@@ -100,6 +112,14 @@ public Boolean isRollup()
return rollup;
}
+ @Nullable
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public List<String> getSortOrder()
+ {
+ return sortOrder;
+ }
+
Review Comment:
i know it cannot be specified during ingest with the current mechanisms, but
it seems like we should include the direction of the ordering as well, maybe
re-use the scan query ordering?
##########
processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java:
##########
@@ -295,6 +300,16 @@ public Metadata getMetadata()
return index.getMetadata();
}
+ @Override
+ public List<String> getSortOrder()
+ {
+ if (index.timePosition == 0) {
+ return Metadata.SORTED_BY_TIME_ONLY;
+ } else {
+ return Collections.emptyList();
Review Comment:
i suppose depending on the type of facts holder we could actually report
something here (rollup should be ordered i think?)
##########
processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java:
##########
@@ -334,6 +349,17 @@ protected IncrementalIndex(
if (!spatialDimensions.isEmpty()) {
this.rowTransformers.add(new
SpatialDimensionRowTransformer(spatialDimensions));
}
+
+ // Set metadata last, so dimensionOrder is populated
+ final List<String> dimensionOrder = getDimensionOrder();
+ this.metadata = new Metadata(
+ null,
+ getCombiningAggregators(metrics),
+ incrementalIndexSchema.getTimestampSpec(),
+ this.gran,
+ this.rollup,
+
ColumnHolder.TIME_COLUMN_NAME.equals(Iterables.getFirst(dimensionOrder, null))
? null : dimensionOrder
Review Comment:
maybe we should always write this out instead of leaving it null? i guess it
is done like this to make it easier to fill in older segments and new segments
ordered by time first?
##########
processing/src/main/java/org/apache/druid/segment/StorageAdapter.java:
##########
@@ -129,6 +141,18 @@ default RowSignature getRowSignature()
@Nullable
Metadata getMetadata();
+ /**
+ * Returns column names that this adapter's data is sorted by. Cursors
returned by this adapter return rows in
+ * this ordering, using the natural comparator for the type of the column as
returned by
+ * {@link #getColumnCapabilities(String)}. Includes {@link
ColumnHolder#TIME_COLUMN_NAME} if appropriate.
+ */
+ List<String> getSortOrder();
+
+ default boolean isTimeOrdered()
+ {
+ return
ColumnHolder.TIME_COLUMN_NAME.equals(Iterables.getFirst(getSortOrder(), null));
+ }
Review Comment:
in the context of #16533, this should probably live on `CursorHolder`
instead of `StorageAdapter`, though it will require a bunch of tests to make
and dispose of a cursor holder to check if their query against the sort order,
but otherwise shouldn't be very disruptive. I wonder if we should include a
direction similar to in that PR though, maybe re-using scan query ordering so
that these two changes are compatible?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java:
##########
@@ -620,10 +620,18 @@ public static Function<Set<DataSegment>,
Set<DataSegment>> addCompactionStateToS
if (storeCompactionState) {
TuningConfig tuningConfig = ingestionSpec.getTuningConfig();
GranularitySpec granularitySpec =
ingestionSpec.getDataSchema().getGranularitySpec();
- // We do not need to store dimensionExclusions and spatialDimensions
since auto compaction does not support them
- DimensionsSpec dimensionsSpec =
ingestionSpec.getDataSchema().getDimensionsSpec() == null
- ? null
- : new
DimensionsSpec(ingestionSpec.getDataSchema().getDimensionsSpec().getDimensions());
+ DimensionsSpec dimensionsSpec;
+ if (ingestionSpec.getDataSchema().getDimensionsSpec() == null) {
+ dimensionsSpec = null;
+ } else {
+ // We do not need to store dimensionExclusions and spatialDimensions
since auto compaction does not support them
Review Comment:
nit: this comment is slightly misleading after #15321, as compaction does
retain spatial dimensions, they just get translated into normal dimensions list
via the handler instead of the separate deprecated spatialDimensions list
##########
processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java:
##########
@@ -256,6 +256,23 @@ public Metadata getMetadata()
return baseAdapter.getMetadata();
}
+ @Override
+ public List<String> getSortOrder()
+ {
+ final List<String> baseSortOrder = baseAdapter.getSortOrder();
+
+ // Sorted the same way as the base segment, unless the unnested column
shadows one of the base columns.
Review Comment:
given that we will basically always be reading the unnested column, i guess
the main expected utility of this will be if ordering by one of the columns
that is not being unnested?
##########
processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java:
##########
@@ -167,6 +168,15 @@ public Metadata getMetadata()
throw new UnsupportedOperationException("Cannot retrieve metadata");
}
+ @Override
+ public List<String> getSortOrder()
+ {
+ // It's possibly incorrect in some cases for sort order to be
SORTED_BY_TIME_ONLY here, but for historical reasons,
+ // we're keeping this in place for now. The handling of "interval" in
"makeCursors", which has been in place for
+ // some time, suggests we think the data is always sorted by time.
Review Comment:
is this referring to the `RowWalker` with its `skipToDateTime`?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -2266,7 +2300,12 @@ private static void populateDimensionsAndAggregators(
QueryContext context
)
{
- if (outputColumnAggregatorFactories.containsKey(outputColumn)) {
+ if (ColumnHolder.TIME_COLUMN_NAME.equals(outputColumn)) {
+ if (!type.is(ValueType.LONG)) {
+ throw DruidException.defensive("Incorrect type[%s] for column[%s]",
type, outputColumn);
+ }
+ dimensions.add(new LongDimensionSchema(outputColumn));
Review Comment:
i wonder if this should use `AutoTypeColumnSchema` with `castToType` set to
`LONG` until we make a version of `LongDimensionSchema` with the features of
the auto columns, since indexes on time column might be useful.
I guess I should get around to bringing some of the features of the auto
columns to the classic dimension schemas, or at least re-branding the
`AutoTypeColumnSchema` when `castToType` is set to something like
`BuiltInTypeColumnSchema` so it looks less weird...
##########
processing/src/main/java/org/apache/druid/segment/Metadata.java:
##########
@@ -233,6 +260,39 @@ public String toString()
", timestampSpec=" + timestampSpec +
", queryGranularity=" + queryGranularity +
", rollup=" + rollup +
+ ", sortOrder=" + sortOrder +
'}';
}
+
+ @Nullable
+ private static List<String> mergeSortOrders(List<List<String>>
sortOrdersToMerge)
+ {
+ if (sortOrdersToMerge.isEmpty()) {
+ return null;
+ }
+
+ if (sortOrdersToMerge.stream().anyMatch(Objects::isNull)) {
+ return null;
+ }
+
+ final List<String> mergedSortOrder = new ArrayList<>();
+
+ while (true) {
+ String column = null;
+
+ for (final List<String> sortOrder : sortOrdersToMerge) {
+ if (mergedSortOrder.size() >= sortOrder.size()) {
Review Comment:
does this happen from columns with all null values or something? maybe this
method could use some comments to make the rationale behind the decisions
clearer
##########
processing/src/main/java/org/apache/druid/segment/Metadata.java:
##########
@@ -233,6 +260,39 @@ public String toString()
", timestampSpec=" + timestampSpec +
", queryGranularity=" + queryGranularity +
", rollup=" + rollup +
+ ", sortOrder=" + sortOrder +
'}';
}
+
+ @Nullable
+ private static List<String> mergeSortOrders(List<List<String>>
sortOrdersToMerge)
+ {
+ if (sortOrdersToMerge.isEmpty()) {
+ return null;
+ }
+
+ if (sortOrdersToMerge.stream().anyMatch(Objects::isNull)) {
+ return null;
Review Comment:
should this be an error if they aren't all null? seems like it should be
pretty consistent across indexable adapters...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]