This is an automated email from the ASF dual-hosted git repository.
clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 12e31b24146 feat: persist __time min/max in V10 ProjectionMetadata
(#19398)
12e31b24146 is described below
commit 12e31b241464dfb6d6feb1cedb6e9e9af1d3b94e
Author: Clint Wylie <[email protected]>
AuthorDate: Fri May 8 02:23:26 2026 -0700
feat: persist __time min/max in V10 ProjectionMetadata (#19398)
changes:
* add nullable `minTime`/`maxTime` Long fields to `ProjectionMetadata`
alongside the existing `numRows`
* track min/max __time in `IndexMergerBase.mergeIndexesAndWriteColumns` via
`timestampSelector.getLong()` on each row; surface via `IndexMergeResult` (null
when zero rows are walked)
* `IndexMergerV10` wires the merge result's values into the base-table
`ProjectionMetadata`; aggregate projections leave the new fields null for now,
though we can easily add this in future work if useful
* tracking is done regardless of segment sort order, non-time-sorted
segments (`DimensionsSpec.forceSegmentSortByTime` = false) also store these
values accurately (just with less utility since we can't use the information to
skip rows, though we could use them to know if there are no rows within the
time range)
* write-side only: fields are persisted but not yet consumed. A follow-up
will add a partial V10-aware `TimeBoundaryInspector` that reads them
---
.../embedded/query/QueryVirtualStorageTest.java | 5 +-
.../org/apache/druid/segment/IndexMergerBase.java | 37 +++++-
.../org/apache/druid/segment/IndexMergerV10.java | 8 +-
.../druid/segment/file/SegmentFileMetadata.java | 9 ++
.../segment/projections/ProjectionMetadata.java | 63 ++++++++-
.../segment/IndexMergerV10MinMaxTimeTest.java | 145 +++++++++++++++++++++
.../projections/ProjectionMetadataTest.java | 26 ++++
7 files changed, 283 insertions(+), 10 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
index a55e3859fd6..3ffaebe1131 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
@@ -67,7 +67,7 @@ import java.util.concurrent.ThreadLocalRandom;
class QueryVirtualStorageTest extends EmbeddedClusterTestBase
{
// size of wiki segments, adjust this if segment size changes for some reason
- private static final long SIZE_BYTES = 3776682L;
+ private static final long SIZE_BYTES = 3777834L;
private static final long CACHE_SIZE = HumanReadableBytes.parse("1MiB");
private static final long MAX_SIZE = HumanReadableBytes.parse("100MiB");
@@ -294,7 +294,8 @@ class QueryVirtualStorageTest extends
EmbeddedClusterTestBase
Assertions.assertTrue(segmentChannelCounters.getLoadFiles()[0] > 0 &&
segmentChannelCounters.getLoadFiles()[0] <=
segmentChannelCounters.getFiles()[0]);
// size of all segments at time of writing, possibly we have to load all
of them, but possibly less depending on
// test order
- Assertions.assertTrue(segmentChannelCounters.getLoadBytes()[0] > 0 &&
segmentChannelCounters.getLoadBytes()[0] <= SIZE_BYTES);
+ Assertions.assertTrue(segmentChannelCounters.getLoadBytes()[0] > 0);
+ Assertions.assertTrue(segmentChannelCounters.getLoadBytes()[0] <=
SIZE_BYTES);
Assertions.assertTrue(segmentChannelCounters.getLoadTime()[0] > 0);
Assertions.assertTrue(segmentChannelCounters.getLoadWait()[0] > 0);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java
b/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java
index 06cde93fa3d..1283b4475b4 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java
@@ -776,10 +776,19 @@ public abstract class IndexMergerBase implements
IndexMerger
rowNumConversions.add(IntBuffer.wrap(arr));
}
+ long minTime = Long.MAX_VALUE;
+ long maxTime = Long.MIN_VALUE;
long time = System.currentTimeMillis();
while (timeAndDimsIterator.moveToNext()) {
progress.progress();
TimeAndDimsPointer timeAndDims = timeAndDimsIterator.getPointer();
+ final long timestamp = timeAndDims.timestampSelector.getLong();
+ if (timestamp < minTime) {
+ minTime = timestamp;
+ }
+ if (timestamp > maxTime) {
+ maxTime = timestamp;
+ }
timeWriter.serialize(timeAndDims.timestampSelector);
for (int metricIndex = 0; metricIndex < timeAndDims.getNumMetrics();
metricIndex++) {
@@ -837,7 +846,12 @@ public abstract class IndexMergerBase implements
IndexMerger
}
log.debug("completed walk through of %,d rows in %,d millis.", rowCount,
System.currentTimeMillis() - startTime);
progress.stopSection(section);
- return new IndexMergeResult(rowNumConversions, rowCount);
+ return new IndexMergeResult(
+ rowNumConversions,
+ rowCount,
+ rowCount == 0 ? null : minTime,
+ rowCount == 0 ? null : maxTime
+ );
}
protected GenericColumnSerializer setupTimeWriter(
@@ -1165,11 +1179,30 @@ public abstract class IndexMergerBase implements
IndexMerger
@Nullable
protected final List<IntBuffer> rowNumConversions;
protected final int rowCount;
+ /**
+ * Minimum {@code __time} value across all rows walked during the merge,
or {@code null} if {@link #rowCount} is
+ * zero.
+ */
+ @Nullable
+ protected final Long minTime;
+ /**
+ * Maximum {@code __time} value across all rows walked during the merge,
or {@code null} if {@link #rowCount} is
+ * zero.
+ */
+ @Nullable
+ protected final Long maxTime;
- private IndexMergeResult(@Nullable List<IntBuffer> rowNumConversions, int
rowCount)
+ private IndexMergeResult(
+ @Nullable List<IntBuffer> rowNumConversions,
+ int rowCount,
+ @Nullable Long minTime,
+ @Nullable Long maxTime
+ )
{
this.rowNumConversions = rowNumConversions;
this.rowCount = rowCount;
+ this.minTime = minTime;
+ this.maxTime = maxTime;
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
index 8c8a6d862d5..91d2661841e 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java
@@ -251,7 +251,13 @@ public class IndexMergerV10 extends IndexMergerBase
final List<ProjectionMetadata> projections = new ArrayList<>();
// ingestion current builds v9 metadata... translate v9 metadata and
projection stuff to v10 format
projections.add(
- ProjectionMetadata.forBaseTable(indexMergeResult.rowCount,
mergedDimensionsWithTime, segmentMetadata)
+ ProjectionMetadata.forBaseTable(
+ indexMergeResult.rowCount,
+ indexMergeResult.minTime,
+ indexMergeResult.maxTime,
+ mergedDimensionsWithTime,
+ segmentMetadata
+ )
);
// make the projections
diff --git
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMetadata.java
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMetadata.java
index 18635220a22..c8e25cfe719 100644
---
a/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMetadata.java
+++
b/processing/src/main/java/org/apache/druid/segment/file/SegmentFileMetadata.java
@@ -108,6 +108,15 @@ public class SegmentFileMetadata
return files;
}
+ /**
+ * The segment's declared interval (the bucket-aligned time range it
covers), as supplied by the writer at build
+ * time and serialized as an ISO-8601 interval string. May be wider than the
actual data's time range, the start
+ * typically reflects the schema's bucket minimum (e.g. start-of-day for a
daily-granularity segment) and the end
+ * is rounded up to the next query-granularity bucket boundary after the
latest row. For exact, data-derived
+ * bounds (e.g. for time-boundary queries) use {@link
ProjectionMetadata#getMinTime} / {@link
+ * ProjectionMetadata#getMaxTime} on the entries in {@link #projections},
which are populated by newer writers and
+ * reflect the true per-projection min/max {@code __time} across all rows.
+ */
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getInterval()
diff --git
a/processing/src/main/java/org/apache/druid/segment/projections/ProjectionMetadata.java
b/processing/src/main/java/org/apache/druid/segment/projections/ProjectionMetadata.java
index dd699036d51..650a0273261 100644
---
a/processing/src/main/java/org/apache/druid/segment/projections/ProjectionMetadata.java
+++
b/processing/src/main/java/org/apache/druid/segment/projections/ProjectionMetadata.java
@@ -20,11 +20,13 @@
package org.apache.druid.segment.projections;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import org.apache.druid.segment.Metadata;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
@@ -35,7 +37,13 @@ public class ProjectionMetadata
{
public static final Interner<ProjectionSchema> SCHEMA_INTERNER =
Interners.newWeakInterner();
- public static ProjectionMetadata forBaseTable(int numRows, List<String>
dims, Metadata metadata)
+ public static ProjectionMetadata forBaseTable(
+ int numRows,
+ @Nullable Long minTime,
+ @Nullable Long maxTime,
+ List<String> dims,
+ Metadata metadata
+ )
{
final ProjectionSchema schema;
if (Boolean.TRUE.equals(metadata.isRollup())) {
@@ -43,20 +51,44 @@ public class ProjectionMetadata
} else {
schema = TableProjectionSchema.fromMetadata(dims, metadata);
}
- return new ProjectionMetadata(numRows, schema);
+ return new ProjectionMetadata(numRows, schema, minTime, maxTime);
}
private final int numRows;
private final ProjectionSchema schema;
+ /**
+ * Minimum {@code __time} value across all rows in this projection, or
{@code null} if the writer didn't supply one
+ * (e.g. older segments written before this field existed, or projections
that don't track it).
+ * <p>
+ * The value is independent of row order: it reflects the actual minimum
timestamp across all walked rows even when
+ * the projection is not time-sorted. Readers can therefore treat both this
field and {@link #maxTime} as exact
+ * bounds whenever they are present.
+ */
+ @Nullable
+ private final Long minTime;
+ /**
+ * Maximum {@code __time} value across all rows in this projection. See
{@link #minTime} for semantics.
+ */
+ @Nullable
+ private final Long maxTime;
@JsonCreator
public ProjectionMetadata(
@JsonProperty("numRows") int numRows,
- @JsonProperty("schema") ProjectionSchema schema
+ @JsonProperty("schema") ProjectionSchema schema,
+ @JsonProperty("minTime") @Nullable Long minTime,
+ @JsonProperty("maxTime") @Nullable Long maxTime
)
{
this.numRows = numRows;
this.schema = SCHEMA_INTERNER.intern(schema);
+ this.minTime = minTime;
+ this.maxTime = maxTime;
+ }
+
+ public ProjectionMetadata(int numRows, ProjectionSchema schema)
+ {
+ this(numRows, schema, null, null);
}
@JsonProperty
@@ -71,6 +103,22 @@ public class ProjectionMetadata
return schema;
}
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public Long getMinTime()
+ {
+ return minTime;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public Long getMaxTime()
+ {
+ return maxTime;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -78,13 +126,16 @@ public class ProjectionMetadata
return false;
}
ProjectionMetadata that = (ProjectionMetadata) o;
- return numRows == that.numRows && Objects.equals(schema, that.schema);
+ return numRows == that.numRows
+ && Objects.equals(schema, that.schema)
+ && Objects.equals(minTime, that.minTime)
+ && Objects.equals(maxTime, that.maxTime);
}
@Override
public int hashCode()
{
- return Objects.hash(numRows, schema);
+ return Objects.hash(numRows, schema, minTime, maxTime);
}
@Override
@@ -93,6 +144,8 @@ public class ProjectionMetadata
return "ProjectionMetadata{" +
"numRows=" + numRows +
", schema=" + schema +
+ ", minTime=" + minTime +
+ ", maxTime=" + maxTime +
'}';
}
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/IndexMergerV10MinMaxTimeTest.java
b/processing/src/test/java/org/apache/druid/segment/IndexMergerV10MinMaxTimeTest.java
new file mode 100644
index 00000000000..cddd50e47c0
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/IndexMergerV10MinMaxTimeTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.ListBasedInputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.file.SegmentFileMapperV10;
+import org.apache.druid.segment.file.SegmentFileMetadata;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.projections.ProjectionMetadata;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+class IndexMergerV10MinMaxTimeTest extends InitializedNullHandlingTest
+{
+ @TempDir
+ File tempDir;
+
+ @Test
+ void testMinMaxTimePersistedForTimeSortedSegment() throws Exception
+ {
+ // Default forceSegmentSortByTime=true: __time is implicitly first in the
sort order, so the segment is sorted
+ // ascending by __time. The merge loop should still capture min/max via
timestampSelector.
+ final DateTime base = DateTimes.of("2025-01-01");
+ final RowSignature signature = RowSignature.builder()
+ .add("dim", ColumnType.STRING)
+ .add("metric", ColumnType.LONG)
+ .build();
+ final List<InputRow> rows = Arrays.asList(
+ new ListBasedInputRow(signature, base, signature.getColumnNames(),
Arrays.asList("a", 1L)),
+ new ListBasedInputRow(signature, base.plusMinutes(5),
signature.getColumnNames(), Arrays.asList("b", 2L)),
+ new ListBasedInputRow(signature, base.plusMinutes(10),
signature.getColumnNames(), Arrays.asList("c", 3L)),
+ new ListBasedInputRow(signature, base.plusMinutes(20),
signature.getColumnNames(), Arrays.asList("d", 4L))
+ );
+
+ final File segmentDir = buildV10Segment(
+ rows,
+ DimensionsSpec.builder()
+ .setDimensions(
+ List.of(
+ new StringDimensionSchema("dim"),
+ new LongDimensionSchema("metric")
+ )
+ )
+ .build()
+ );
+
+ assertBaseProjectionMinMaxTime(segmentDir, base.getMillis(),
base.plusMinutes(20).getMillis());
+ }
+
+ @Test
+ void testMinMaxTimePersistedForNonTimeSortedSegment() throws Exception
+ {
+ // forceSegmentSortByTime=false: the segment is sorted by the explicit
dimension order (dim, then __time), so
+ // min/max time do NOT correspond to the first/last row positions. The
merge loop must capture min/max from the
+ // timestampSelector regardless of physical row order.
+ final DateTime base = DateTimes.of("2025-01-01");
+ final RowSignature signature = RowSignature.builder()
+ .add("dim", ColumnType.STRING)
+ .build();
+ final List<InputRow> rows = Arrays.asList(
+ new ListBasedInputRow(signature, base.plusMinutes(20),
signature.getColumnNames(), List.of("c")),
+ new ListBasedInputRow(signature, base, signature.getColumnNames(),
List.of("a")),
+ new ListBasedInputRow(signature, base.plusMinutes(10),
signature.getColumnNames(), List.of("b"))
+ );
+
+ final File segmentDir = buildV10Segment(
+ rows,
+ DimensionsSpec.builder()
+ .setDimensions(
+ List.of(
+ new StringDimensionSchema("dim"),
+ new LongDimensionSchema("__time")
+ )
+ )
+ .setForceSegmentSortByTime(false)
+ .build()
+ );
+
+ assertBaseProjectionMinMaxTime(segmentDir, base.getMillis(),
base.plusMinutes(20).getMillis());
+ }
+
+ private File buildV10Segment(List<InputRow> rows, DimensionsSpec
dimensionsSpec)
+ {
+ final long minTs =
rows.stream().mapToLong(InputRow::getTimestampFromEpoch).min().orElseThrow();
+ return IndexBuilder.create()
+ .useV10()
+ .tmpDir(tempDir)
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ IncrementalIndexSchema.builder()
+
.withDimensionsSpec(dimensionsSpec)
+ .withRollup(false)
+ .withMinTimestamp(minTs)
+ .build()
+ )
+ .rows(rows)
+ .buildMMappedIndexFile();
+ }
+
+ private void assertBaseProjectionMinMaxTime(File segmentDir, long
expectedMin, long expectedMax) throws Exception
+ {
+ final File v10File = new File(segmentDir, IndexIO.V10_FILE_NAME);
+ try (SegmentFileMapperV10 mapper = SegmentFileMapperV10.create(v10File,
TestHelper.JSON_MAPPER)) {
+ final SegmentFileMetadata metadata = mapper.getSegmentFileMetadata();
+ Assertions.assertNotNull(metadata.getProjections());
+ Assertions.assertFalse(metadata.getProjections().isEmpty());
+ // Base table is always the first projection in V10 metadata.
+ final ProjectionMetadata baseProjection =
metadata.getProjections().get(0);
+ Assertions.assertEquals(Long.valueOf(expectedMin),
baseProjection.getMinTime(), "minTime");
+ Assertions.assertEquals(Long.valueOf(expectedMax),
baseProjection.getMaxTime(), "maxTime");
+ }
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/segment/projections/ProjectionMetadataTest.java
b/processing/src/test/java/org/apache/druid/segment/projections/ProjectionMetadataTest.java
index f7c58dbcd1a..821511150bb 100644
---
a/processing/src/test/java/org/apache/druid/segment/projections/ProjectionMetadataTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/projections/ProjectionMetadataTest.java
@@ -72,6 +72,32 @@ class ProjectionMetadataTest
);
}
+ @Test
+ void testSerdeWithMinMaxTime() throws JsonProcessingException
+ {
+ final ProjectionMetadata spec = new ProjectionMetadata(
+ 12345,
+ new AggregateProjectionSchema(
+ "some_projection",
+ "time",
+ null,
+ VirtualColumns.EMPTY,
+ Arrays.asList("a", "time"),
+ new AggregatorFactory[]{new CountAggregatorFactory("count")},
+ Arrays.asList(OrderBy.ascending("a"), OrderBy.ascending("time"))
+ ),
+ 1_700_000_000_000L,
+ 1_700_000_999_999L
+ );
+ final ProjectionMetadata roundTripped = JSON_MAPPER.readValue(
+ JSON_MAPPER.writeValueAsString(spec),
+ ProjectionMetadata.class
+ );
+ Assertions.assertEquals(spec, roundTripped);
+ Assertions.assertEquals(Long.valueOf(1_700_000_000_000L),
roundTripped.getMinTime());
+ Assertions.assertEquals(Long.valueOf(1_700_000_999_999L),
roundTripped.getMaxTime());
+ }
+
@Test
void testEqualsAndHashcode()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]