This is an automated email from the ASF dual-hosted git repository.

zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ae6afc0751b Extend unused segment metadata api response to include 
created date and last used updated time (#15738)
ae6afc0751b is described below

commit ae6afc0751b8912b7beb374ffca589e288b3ed9f
Author: zachjsh <[email protected]>
AuthorDate: Fri Jan 26 15:47:40 2024 -0500

    Extend unused segment metadata api response to include created date and 
last used updated time (#15738)
    
    ### Description
    
    The unusedSegment api response was extended to include the original 
DataSegment object with the creation time and last used update time added to 
it. A new object `DataSegmentPlus` was created for this purpose, and the 
metadata queries used were updated as needed.
    
    example response:
    
    ```
    [
      {
        "dataSegment": {
          "dataSource": "inline_data",
          "interval": "2023-01-02T00:00:00.000Z/2023-01-03T00:00:00.000Z",
          "version": "2024-01-25T16:06:42.835Z",
          "loadSpec": {
            "type": "local",
            "path": 
"/Users/zachsherman/projects/opensrc-druid/distribution/target/apache-druid-30.0.0-SNAPSHOT/var/druid/segments/inline_data/2023-01-02T00:00:00.000Z_2023-01-03T00:00:00.000Z/2024-01-25T16:06:42.835Z/0/index/"
          },
          "dimensions": "str_dim,double_measure1,double_measure2",
          "metrics": "",
          "shardSpec": {
            "type": "numbered",
            "partitionNum": 0,
            "partitions": 1
          },
          "binaryVersion": 9,
          "size": 1402,
          "identifier": 
"inline_data_2023-01-02T00:00:00.000Z_2023-01-03T00:00:00.000Z_2024-01-25T16:06:42.835Z"
        },
        "createdDate": "2024-01-25T16:06:44.196Z",
        "usedStatusLastUpdatedDate": "2024-01-25T16:07:34.909Z"
      }
    ]
    ```
---
 .../druid/metadata/SegmentsMetadataManager.java    |  13 +-
 .../druid/metadata/SqlSegmentsMetadataManager.java |  12 +-
 .../druid/metadata/SqlSegmentsMetadataQuery.java   | 211 ++++++++++++++++-
 .../apache/druid/server/http/DataSegmentPlus.java  | 113 +++++++++
 .../apache/druid/server/http/MetadataResource.java |  10 +-
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 254 +++++++++++++++++++--
 .../simulate/TestSegmentsMetadataManager.java      |   3 +-
 .../druid/server/http/DataSegmentPlusTest.java     | 149 ++++++++++++
 .../druid/server/http/MetadataResourceTest.java    |  37 +--
 9 files changed, 748 insertions(+), 54 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java 
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
index 94c2fae60fd..611c19f8462 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.server.http.DataSegmentPlus;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.joda.time.DateTime;
@@ -126,11 +127,11 @@ public interface SegmentsMetadataManager
   );
 
   /**
-   * Returns an iterable to go over un-used segments for a given datasource 
over an optional interval.
-   * The order in which segments are iterated is from earliest start-time, 
with ties being broken with earliest end-time
-   * first. Note: the iteration may not be as trivially cheap as,
-   * for example, iteration over an ArrayList. Try (to some reasonable extent) 
to organize the code so that it
-   * iterates the returned iterable only once rather than several times.
+   * Returns an iterable to go over un-used segments and their associated 
metadata for a given datasource over an
+   * optional interval. The order in which segments are iterated is from 
earliest start-time, with ties being broken
+   * with earliest end-time first. Note: the iteration may not be as trivially 
cheap as for example, iteration over an
+   * ArrayList. Try (to some reasonable extent) to organize the code so that 
it iterates the returned iterable only
+   * once rather than several times.
    *
    * @param datasource    the name of the datasource.
    * @param interval      an optional interval to search over. If none is 
specified, {@link org.apache.druid.java.util.common.Intervals#ETERNITY}
@@ -141,7 +142,7 @@ public interface SegmentsMetadataManager
    * @param sortOrder     an optional order with which to return the matching 
segments by id, start time, end time.
    *                      If none is specified, the order of the results is 
not guarenteed.
    */
-  Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
+  Iterable<DataSegmentPlus> iterateAllUnusedSegmentsForDatasource(
       String datasource,
       @Nullable Interval interval,
       @Nullable Integer limit,
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 0b423006152..88dd26c03d0 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -47,6 +47,7 @@ import 
org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.http.DataSegmentPlus;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.Partitions;
 import org.apache.druid.timeline.SegmentId;
@@ -957,8 +958,9 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
   }
 
   /**
-   * Retrieves segments for a given datasource that are marked unused and that 
are *fully contained by* an optionally
-   * specified interval. If the interval specified is null, this method will 
retrieve all unused segments.
+   * Retrieves segments and their associated metadata for a given datasource 
that are marked unused and that are
+   * *fully contained by* an optionally specified interval. If the interval 
specified is null, this method will
+   * retrieve all unused segments.
    *
    * This call does not return any information about realtime segments.
    *
@@ -976,7 +978,7 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
    * Returns an iterable.
    */
   @Override
-  public Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
+  public Iterable<DataSegmentPlus> iterateAllUnusedSegmentsForDatasource(
       final String datasource,
       @Nullable final Interval interval,
       @Nullable final Integer limit,
@@ -993,8 +995,8 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
               interval == null
                   ? Intervals.ONLY_ETERNITY
                   : Collections.singletonList(interval);
-          try (final CloseableIterator<DataSegment> iterator =
-                   queryTool.retrieveUnusedSegments(datasource, intervals, 
limit, lastSegmentId, sortOrder, null)) {
+          try (final CloseableIterator<DataSegmentPlus> iterator =
+                   queryTool.retrieveUnusedSegmentsPlus(datasource, intervals, 
limit, lastSegmentId, sortOrder, null)) {
             return ImmutableList.copyOf(iterator);
           }
         }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 3bd7c48ad05..adbfb0fda23 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.server.http.DataSegmentPlus;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.joda.time.DateTime;
@@ -173,6 +174,49 @@ public class SqlSegmentsMetadataQuery
     );
   }
 
+  /**
+   * Similar to {@link #retrieveUnusedSegments}, but also retrieves associated 
metadata for the segments for a given
+   * datasource that are marked unused and that are *fully contained by* any 
interval in a particular collection of
+   * intervals. If the collection of intervals is empty, this method will 
retrieve all unused segments.
+   *
+   * This call does not return any information about realtime segments.
+   *
+   * @param dataSource    The name of the datasource
+   * @param intervals     The intervals to search over
+   * @param limit         The limit of segments to return
+   * @param lastSegmentId the last segment id from which to search for 
results. All segments returned are >
+   *                      this segment lexigraphically if sortOrder is null or 
ASC, or < this segment
+   *                      lexigraphically if sortOrder is DESC.
+   * @param sortOrder     Specifies the order with which to return the 
matching segments by start time, end time.
+   *                      A null value indicates that order does not matter.
+   * @param maxUsedStatusLastUpdatedTime The maximum {@code 
used_status_last_updated} time. Any unused segment in {@code intervals}
+   *                                   with {@code used_status_last_updated} 
no later than this time will be included in the
+   *                                   iterator. Segments without {@code 
used_status_last_updated} time (due to an upgrade
+   *                                   from legacy Druid) will have {@code 
maxUsedStatusLastUpdatedTime} ignored
+
+   * Returns a closeable iterator. You should close it when you are done.
+   */
+  public CloseableIterator<DataSegmentPlus> retrieveUnusedSegmentsPlus(
+      final String dataSource,
+      final Collection<Interval> intervals,
+      @Nullable final Integer limit,
+      @Nullable final String lastSegmentId,
+      @Nullable final SortOrder sortOrder,
+      @Nullable final DateTime maxUsedStatusLastUpdatedTime
+  )
+  {
+    return retrieveSegmentsPlus(
+        dataSource,
+        intervals,
+        IntervalMode.CONTAINS,
+        false,
+        limit,
+        lastSegmentId,
+        sortOrder,
+        maxUsedStatusLastUpdatedTime
+    );
+  }
+
   /**
    * Marks the provided segments as either used or unused.
    *
@@ -445,6 +489,54 @@ public class SqlSegmentsMetadataQuery
     }
   }
 
+  private CloseableIterator<DataSegmentPlus> retrieveSegmentsPlus(
+      final String dataSource,
+      final Collection<Interval> intervals,
+      final IntervalMode matchMode,
+      final boolean used,
+      @Nullable final Integer limit,
+      @Nullable final String lastSegmentId,
+      @Nullable final SortOrder sortOrder,
+      @Nullable final DateTime maxUsedStatusLastUpdatedTime
+  )
+  {
+    if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) {
+      return CloseableIterators.withEmptyBaggage(
+          retrieveSegmentsPlusInIntervalsBatch(dataSource, intervals, 
matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)
+      );
+    } else {
+      final List<List<Interval>> intervalsLists = Lists.partition(new 
ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
+      final List<Iterator<DataSegmentPlus>> resultingIterators = new 
ArrayList<>();
+      Integer limitPerBatch = limit;
+
+      for (final List<Interval> intervalList : intervalsLists) {
+        final UnmodifiableIterator<DataSegmentPlus> iterator = 
retrieveSegmentsPlusInIntervalsBatch(
+            dataSource,
+            intervalList,
+            matchMode,
+            used,
+            limitPerBatch,
+            lastSegmentId,
+            sortOrder,
+            maxUsedStatusLastUpdatedTime
+        );
+        if (limitPerBatch != null) {
+          // If limit is provided, we need to shrink the limit for subsequent 
batches or circuit break if
+          // we have reached what was requested for.
+          final List<DataSegmentPlus> dataSegments = 
ImmutableList.copyOf(iterator);
+          resultingIterators.add(dataSegments.iterator());
+          if (dataSegments.size() >= limitPerBatch) {
+            break;
+          }
+          limitPerBatch -= dataSegments.size();
+        } else {
+          resultingIterators.add(iterator);
+        }
+      }
+      return 
CloseableIterators.withEmptyBaggage(Iterators.concat(resultingIterators.iterator()));
+    }
+  }
+
   private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
       final String dataSource,
       final Collection<Interval> intervals,
@@ -455,12 +547,73 @@ public class SqlSegmentsMetadataQuery
       @Nullable final SortOrder sortOrder,
       @Nullable final DateTime maxUsedStatusLastUpdatedTime
   )
+  {
+    final Query<Map<String, Object>> sql = buildSegmentsTableQuery(
+        dataSource,
+        intervals,
+        matchMode,
+        used,
+        limit,
+        lastSegmentId,
+        sortOrder,
+        maxUsedStatusLastUpdatedTime,
+        false
+    );
+
+    final ResultIterator<DataSegment> resultIterator = 
getDataSegmentResultIterator(sql);
+
+    return filterDataSegmentIteratorByInterval(resultIterator, intervals, 
matchMode);
+  }
+
+  private UnmodifiableIterator<DataSegmentPlus> 
retrieveSegmentsPlusInIntervalsBatch(
+      final String dataSource,
+      final Collection<Interval> intervals,
+      final IntervalMode matchMode,
+      final boolean used,
+      @Nullable final Integer limit,
+      @Nullable final String lastSegmentId,
+      @Nullable final SortOrder sortOrder,
+      @Nullable final DateTime maxUsedStatusLastUpdatedTime
+  )
+  {
+
+    final Query<Map<String, Object>> sql = buildSegmentsTableQuery(
+        dataSource,
+        intervals,
+        matchMode,
+        used,
+        limit,
+        lastSegmentId,
+        sortOrder,
+        maxUsedStatusLastUpdatedTime,
+        true
+    );
+
+    final ResultIterator<DataSegmentPlus> resultIterator = 
getDataSegmentPlusResultIterator(sql);
+
+    return filterDataSegmentPlusIteratorByInterval(resultIterator, intervals, 
matchMode);
+  }
+
+  private Query<Map<String, Object>> buildSegmentsTableQuery(
+      final String dataSource,
+      final Collection<Interval> intervals,
+      final IntervalMode matchMode,
+      final boolean used,
+      @Nullable final Integer limit,
+      @Nullable final String lastSegmentId,
+      @Nullable final SortOrder sortOrder,
+      @Nullable final DateTime maxUsedStatusLastUpdatedTime,
+      final boolean includeExtraInfo
+  )
   {
     // Check if the intervals all support comparing as strings. If so, bake 
them into the SQL.
     final boolean compareAsString = 
intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings);
-
     final StringBuilder sb = new StringBuilder();
-    sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource = 
:dataSource");
+    if (includeExtraInfo) {
+      sb.append("SELECT payload, created_date, used_status_last_updated FROM 
%s WHERE used = :used AND dataSource = :dataSource");
+    } else {
+      sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource = 
:dataSource");
+    }
 
     if (compareAsString) {
       appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, 
connector);
@@ -513,10 +666,31 @@ public class SqlSegmentsMetadataQuery
       bindQueryIntervals(sql, intervals);
     }
 
-    final ResultIterator<DataSegment> resultIterator =
-        sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, 
r.getBytes(1), DataSegment.class))
-           .iterator();
+    return sql;
+  }
+
+  private ResultIterator<DataSegment> 
getDataSegmentResultIterator(Query<Map<String, Object>> sql)
+  {
+    return sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, 
r.getBytes(1), DataSegment.class))
+        .iterator();
+  }
 
+  private ResultIterator<DataSegmentPlus> 
getDataSegmentPlusResultIterator(Query<Map<String, Object>> sql)
+  {
+    return sql.map((index, r, ctx) -> new DataSegmentPlus(
+            JacksonUtils.readValue(jsonMapper, r.getBytes(1), 
DataSegment.class),
+            DateTimes.of(r.getString(2)),
+            DateTimes.of(r.getString(3))
+        ))
+        .iterator();
+  }
+
+  private UnmodifiableIterator<DataSegment> 
filterDataSegmentIteratorByInterval(
+      ResultIterator<DataSegment> resultIterator,
+      final Collection<Interval> intervals,
+      final IntervalMode matchMode
+  )
+  {
     return Iterators.filter(
         resultIterator,
         dataSegment -> {
@@ -538,6 +712,33 @@ public class SqlSegmentsMetadataQuery
     );
   }
 
+  private UnmodifiableIterator<DataSegmentPlus> 
filterDataSegmentPlusIteratorByInterval(
+      ResultIterator<DataSegmentPlus> resultIterator,
+      final Collection<Interval> intervals,
+      final IntervalMode matchMode
+  )
+  {
+    return Iterators.filter(
+        resultIterator,
+        dataSegment -> {
+          if (intervals.isEmpty()) {
+            return true;
+          } else {
+            // Must re-check that the interval matches, even if comparing as 
string, because the *segment interval*
+            // might not be string-comparable. (Consider a query interval like 
"2000-01-01/3000-01-01" and a
+            // segment interval like "20010/20011".)
+            for (Interval interval : intervals) {
+              if (matchMode.apply(interval, 
dataSegment.getDataSegment().getInterval())) {
+                return true;
+              }
+            }
+
+            return false;
+          }
+        }
+    );
+  }
+
   private static int computeNumChangedSegments(List<String> segmentIds, int[] 
segmentChanges)
   {
     int numChangedSegments = 0;
diff --git 
a/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java 
b/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java
new file mode 100644
index 00000000000..2a0d7fc05ea
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.guice.annotations.UnstableApi;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Encapsulates a {@link DataSegment} and additional metadata about it:
+ * {@link DataSegmentPlus#createdDate}:               The time when the 
segment was created </li>
+ * {@link DataSegmentPlus#usedStatusLastUpdatedDate}: The time when the 
segments used status was last updated </li>
+ * <p>
+ * The class closesly resembles the row structure of the {@link 
MetadataStorageTablesConfig#getSegmentsTable()}
+ * </p>
+ */
+@UnstableApi
+public class DataSegmentPlus
+{
+  private final DataSegment dataSegment;
+  private final DateTime createdDate;
+  @Nullable
+  private final DateTime usedStatusLastUpdatedDate;
+
+  @JsonCreator
+  public DataSegmentPlus(
+      @JsonProperty("dataSegment") final DataSegment dataSegment,
+      @JsonProperty("createdDate") final DateTime createdDate,
+      @JsonProperty("usedStatusLastUpdatedDate") @Nullable final DateTime 
usedStatusLastUpdatedDate
+  )
+  {
+    this.dataSegment = dataSegment;
+    this.createdDate = createdDate;
+    this.usedStatusLastUpdatedDate = usedStatusLastUpdatedDate;
+  }
+
+  @JsonProperty
+  public DateTime getCreatedDate()
+  {
+    return createdDate;
+  }
+
+  @Nullable
+  @JsonProperty
+  public DateTime getUsedStatusLastUpdatedDate()
+  {
+    return usedStatusLastUpdatedDate;
+  }
+
+  @JsonProperty
+  public DataSegment getDataSegment()
+  {
+    return dataSegment;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DataSegmentPlus that = (DataSegmentPlus) o;
+    return Objects.equals(dataSegment, that.getDataSegment())
+           && Objects.equals(createdDate, that.getCreatedDate())
+           && Objects.equals(usedStatusLastUpdatedDate, 
that.getUsedStatusLastUpdatedDate());
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(
+        dataSegment,
+        createdDate,
+        usedStatusLastUpdatedDate
+    );
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DataSegmentPlus{" +
+           "createdDate=" + getCreatedDate() +
+           ", usedStatusLastUpdatedDate=" + getUsedStatusLastUpdatedDate() +
+           ", dataSegment=" + getDataSegment() +
+           '}';
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java 
b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
index fb976a04bf3..c53a998e238 100644
--- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
@@ -363,7 +363,7 @@ public class MetadataResource
     SortOrder theSortOrder = sortOrder == null ? null : 
SortOrder.fromValue(sortOrder);
 
     final Interval theInterval = interval != null ? 
Intervals.of(interval.replace('_', '/')) : null;
-    Iterable<DataSegment> unusedSegments = 
segmentsMetadataManager.iterateAllUnusedSegmentsForDatasource(
+    Iterable<DataSegmentPlus> unusedSegments = 
segmentsMetadataManager.iterateAllUnusedSegmentsForDatasource(
         dataSource,
         theInterval,
         limit,
@@ -371,13 +371,13 @@ public class MetadataResource
         theSortOrder
     );
 
-    final Function<DataSegment, Iterable<ResourceAction>> raGenerator = 
segment -> Collections.singletonList(
-        
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
+    final Function<DataSegmentPlus, Iterable<ResourceAction>> raGenerator = 
segment -> Collections.singletonList(
+        
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource()));
 
-    final Iterable<DataSegment> authorizedSegments =
+    final Iterable<DataSegmentPlus> authorizedSegments =
         AuthorizationUtils.filterAuthorizedResources(req, unusedSegments, 
raGenerator, authorizerMapper);
 
-    final List<DataSegment> retVal = new ArrayList<>();
+    final List<DataSegmentPlus> retVal = new ArrayList<>();
     authorizedSegments.iterator().forEachRemaining(retVal::add);
     return Response.status(Response.Status.OK).entity(retVal).build();
   }
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 0626792f1fd..da7bf422692 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -42,7 +42,9 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.server.http.DataSegmentPlus;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
 import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
@@ -84,6 +86,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class IndexerSQLMetadataStorageCoordinatorTest
@@ -1240,7 +1243,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit() 
throws IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+    DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+    markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
 
     final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegments(
         
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
@@ -1251,13 +1255,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
     Assert.assertEquals(segments.size(), actualUnusedSegments.size());
     Assert.assertTrue(segments.containsAll(actualUnusedSegments));
+
+    final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus = 
retrieveUnusedSegmentsPlus(
+        
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+        null,
+        null,
+        null,
+        null
+    );
+    Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus.size());
+    verifyContainsAllSegmentsPlus(segments, actualUnusedSegmentsPlus, 
usedStatusLastUpdatedTime);
   }
 
   @Test
   public void 
testRetrieveUnusedSegmentsUsingNoIntervalsNoLimitAndNoLastSegmentId() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+    DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+    markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
 
     final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegments(
         ImmutableList.of(),
@@ -1268,13 +1283,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
     Assert.assertEquals(segments.size(), actualUnusedSegments.size());
     Assert.assertTrue(segments.containsAll(actualUnusedSegments));
+
+    final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus = 
retrieveUnusedSegmentsPlus(
+        ImmutableList.of(),
+        null,
+        null,
+        null,
+        null
+    );
+    Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus.size());
+    verifyContainsAllSegmentsPlus(segments, actualUnusedSegmentsPlus, 
usedStatusLastUpdatedTime);
   }
 
   @Test
   public void 
testRetrieveUnusedSegmentsUsingNoIntervalsAndNoLimitAndNoLastSegmentId() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(2033, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+    DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+    markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
 
     String lastSegmentId = segments.get(9).getId().toString();
     final List<DataSegment> expectedSegmentsAscOrder = segments.stream()
@@ -1290,6 +1316,16 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     Assert.assertEquals(expectedSegmentsAscOrder.size(), 
actualUnusedSegments.size());
     
Assert.assertTrue(expectedSegmentsAscOrder.containsAll(actualUnusedSegments));
 
+    ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus = 
retrieveUnusedSegmentsPlus(
+        ImmutableList.of(),
+        null,
+        lastSegmentId,
+        null,
+        null
+    );
+    Assert.assertEquals(expectedSegmentsAscOrder.size(), 
actualUnusedSegmentsPlus.size());
+    verifyContainsAllSegmentsPlus(expectedSegmentsAscOrder, 
actualUnusedSegmentsPlus, usedStatusLastUpdatedTime);
+
     actualUnusedSegments = retrieveUnusedSegments(
         ImmutableList.of(),
         null,
@@ -1300,6 +1336,16 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     Assert.assertEquals(expectedSegmentsAscOrder.size(), 
actualUnusedSegments.size());
     Assert.assertEquals(expectedSegmentsAscOrder, actualUnusedSegments);
 
+    actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus(
+        ImmutableList.of(),
+        null,
+        lastSegmentId,
+        SortOrder.ASC,
+        null
+    );
+    Assert.assertEquals(expectedSegmentsAscOrder.size(), 
actualUnusedSegmentsPlus.size());
+    verifyEqualsAllSegmentsPlus(expectedSegmentsAscOrder, 
actualUnusedSegmentsPlus, usedStatusLastUpdatedTime);
+
     final List<DataSegment> expectedSegmentsDescOrder = segments.stream()
         .filter(s -> s.getId().toString().compareTo(lastSegmentId) < 0)
         .collect(Collectors.toList());
@@ -1314,13 +1360,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
     Assert.assertEquals(expectedSegmentsDescOrder.size(), 
actualUnusedSegments.size());
     Assert.assertEquals(expectedSegmentsDescOrder, actualUnusedSegments);
+
+    actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus(
+        ImmutableList.of(),
+        null,
+        lastSegmentId,
+        SortOrder.DESC,
+        null
+    );
+    Assert.assertEquals(expectedSegmentsDescOrder.size(), 
actualUnusedSegmentsPlus.size());
+    verifyEqualsAllSegmentsPlus(expectedSegmentsDescOrder, 
actualUnusedSegmentsPlus, usedStatusLastUpdatedTime);
   }
 
   @Test
   public void 
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+    DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+    markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
 
     final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegments(
         
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
@@ -1331,13 +1388,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
     Assert.assertEquals(segments.size(), actualUnusedSegments.size());
     Assert.assertTrue(segments.containsAll(actualUnusedSegments));
+
+    final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus = 
retrieveUnusedSegmentsPlus(
+        ImmutableList.of(),
+        segments.size(),
+        null,
+        null,
+        null
+    );
+    Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus.size());
+    verifyContainsAllSegmentsPlus(segments, actualUnusedSegmentsPlus, 
usedStatusLastUpdatedTime);
   }
 
   @Test
   public void 
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+    DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+    markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
 
     final int requestedLimit = segments.size() - 1;
     final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegments(
@@ -1347,18 +1415,34 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         null,
         null
     );
+    final List<DataSegment> expectedSegments = 
segments.stream().limit(requestedLimit).collect(Collectors.toList());
     Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
-    
Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList())));
+    Assert.assertTrue(actualUnusedSegments.containsAll(expectedSegments));
+
+    final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus = 
retrieveUnusedSegmentsPlus(
+        ImmutableList.of(),
+        requestedLimit,
+        null,
+        null,
+        null
+    );
+    Assert.assertEquals(requestedLimit, actualUnusedSegmentsPlus.size());
+    verifyContainsAllSegmentsPlus(expectedSegments, actualUnusedSegmentsPlus, 
usedStatusLastUpdatedTime);
   }
 
   @Test
   public void 
testRetrieveUnusedSegmentsUsingMultipleIntervalsInSingleBatchLimitAndLastSegmentId()
 throws IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(2034, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+    DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+    markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
 
     final int requestedLimit = segments.size();
     final String lastSegmentId = segments.get(4).getId().toString();
+    final List<DataSegment> expectedSegments = segments.stream()
+        .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0)
+        .limit(requestedLimit)
+        .collect(Collectors.toList());
     final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegments(
         
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
         requestedLimit,
@@ -1367,20 +1451,32 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         null
     );
     Assert.assertEquals(segments.size() - 5, actualUnusedSegments.size());
-    Assert.assertEquals(actualUnusedSegments, segments.stream()
-        .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0)
-        .limit(requestedLimit)
-        .collect(Collectors.toList()));
+    Assert.assertEquals(actualUnusedSegments, expectedSegments);
+
+    final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus = 
retrieveUnusedSegmentsPlus(
+        ImmutableList.of(),
+        requestedLimit,
+        lastSegmentId,
+        null,
+        null
+    );
+    Assert.assertEquals(segments.size() - 5, actualUnusedSegmentsPlus.size());
+    verifyEqualsAllSegmentsPlus(expectedSegments, actualUnusedSegmentsPlus, 
usedStatusLastUpdatedTime);
   }
 
   @Test
   public void 
testRetrieveUnusedSegmentsUsingMultipleIntervalsLimitAndLastSegmentId() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+    DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+    markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
 
     final int requestedLimit = segments.size() - 1;
     final String lastSegmentId = segments.get(4).getId().toString();
+    final List<DataSegment> expectedSegments = segments.stream()
+        .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0)
+        .limit(requestedLimit)
+        .collect(Collectors.toList());
     final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegments(
         
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
         requestedLimit,
@@ -1389,17 +1485,25 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         null
     );
     Assert.assertEquals(requestedLimit - 4, actualUnusedSegments.size());
-    Assert.assertEquals(actualUnusedSegments, segments.stream()
-        .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0)
-        .limit(requestedLimit)
-        .collect(Collectors.toList()));
+    Assert.assertEquals(actualUnusedSegments, expectedSegments);
+
+    final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus = 
retrieveUnusedSegmentsPlus(
+        
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+        requestedLimit,
+        lastSegmentId,
+        null,
+        null
+    );
+    Assert.assertEquals(requestedLimit - 4, actualUnusedSegmentsPlus.size());
+    verifyEqualsAllSegmentsPlus(expectedSegments, actualUnusedSegmentsPlus, 
usedStatusLastUpdatedTime);
   }
 
   @Test
   public void testRetrieveUnusedSegmentsUsingMultipleIntervals() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+    DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+    markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
 
     final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegments(
         
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
@@ -1410,6 +1514,16 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
     Assert.assertEquals(segments.size(), actualUnusedSegments.size());
     Assert.assertTrue(actualUnusedSegments.containsAll(segments));
+
+    final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus = 
retrieveUnusedSegmentsPlus(
+        
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+        segments.size() + 1,
+        null,
+        null,
+        null
+    );
+    Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus.size());
+    verifyContainsAllSegmentsPlus(segments, actualUnusedSegmentsPlus, 
usedStatusLastUpdatedTime);
   }
 
   @Test
@@ -1430,13 +1544,23 @@ public class IndexerSQLMetadataStorageCoordinatorTest
          null
     );
     Assert.assertEquals(0, actualUnusedSegments.size());
+
+    final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus = 
retrieveUnusedSegmentsPlus(
+        ImmutableList.of(outOfRangeInterval),
+        null,
+        null,
+        null,
+        null
+    );
+    Assert.assertEquals(0, actualUnusedSegmentsPlus.size());
   }
 
   @Test
   public void testRetrieveUnusedSegmentsWithMaxUsedStatusLastUpdatedTime() 
throws IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1905, 
1910);
-    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+    DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+    markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
 
     final Interval interval = Intervals.of("1905/1920");
 
@@ -1449,6 +1573,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
     Assert.assertEquals(5, actualUnusedSegments1.size());
 
+    ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus = 
retrieveUnusedSegmentsPlus(
+        ImmutableList.of(interval),
+        null,
+        null,
+        null,
+        DateTimes.nowUtc()
+    );
+    Assert.assertEquals(5, actualUnusedSegmentsPlus.size());
+
     final ImmutableList<DataSegment> actualUnusedSegments2 = 
retrieveUnusedSegments(
         ImmutableList.of(interval),
         null,
@@ -1457,6 +1590,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         DateTimes.nowUtc().minusHours(1)
     );
     Assert.assertEquals(0, actualUnusedSegments2.size());
+
+    actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus(
+        ImmutableList.of(interval),
+        null,
+        null,
+        null,
+        DateTimes.nowUtc().minusHours(1)
+    );
+    Assert.assertEquals(0, actualUnusedSegmentsPlus.size());
   }
 
   @Test
@@ -1492,6 +1634,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
     Assert.assertEquals(oddYearSegments.size(), actualUnusedSegments1.size());
 
+    final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus1 = 
retrieveUnusedSegmentsPlus(
+        ImmutableList.of(interval),
+        null,
+        null,
+        null,
+        maxUsedStatusLastUpdatedTime1
+    );
+    Assert.assertEquals(oddYearSegments.size(), 
actualUnusedSegmentsPlus1.size());
+
     final ImmutableList<DataSegment> actualUnusedSegments2 = 
retrieveUnusedSegments(
         ImmutableList.of(interval),
         null,
@@ -1500,6 +1651,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         maxUsedStatusLastUpdatedTime2
     );
     Assert.assertEquals(segments.size(), actualUnusedSegments2.size());
+
+    final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus2 = 
retrieveUnusedSegmentsPlus(
+        ImmutableList.of(interval),
+        null,
+        null,
+        null,
+        maxUsedStatusLastUpdatedTime2
+    );
+    Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus2.size());
   }
 
   @Test
@@ -3343,6 +3503,64 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
   }
 
+  private ImmutableList<DataSegmentPlus> retrieveUnusedSegmentsPlus(
+      final List<Interval> intervals,
+      final Integer limit,
+      final String lastSegmentId,
+      final SortOrder sortOrder,
+      final DateTime maxUsedStatusLastUpdatedTime
+  )
+  {
+    return derbyConnector.inReadOnlyTransaction(
+        (handle, status) -> {
+          try (final CloseableIterator<DataSegmentPlus> iterator =
+                   SqlSegmentsMetadataQuery.forHandle(
+                           handle,
+                           derbyConnector,
+                           
derbyConnectorRule.metadataTablesConfigSupplier().get(),
+                           mapper
+                       )
+                       .retrieveUnusedSegmentsPlus(DS.WIKI, intervals, limit, 
lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) {
+            return ImmutableList.copyOf(iterator);
+          }
+        }
+    );
+  }
+
+  private void verifyContainsAllSegmentsPlus(
+      List<DataSegment> expectedSegments,
+      List<DataSegmentPlus> actualUnusedSegmentsPlus,
+      DateTime usedStatusLastUpdatedTime)
+  {
+    Map<SegmentId, DataSegment> expectedIdToSegment = 
expectedSegments.stream().collect(Collectors.toMap(DataSegment::getId, 
Function.identity()));
+    Map<SegmentId, DataSegmentPlus> actualIdToSegmentPlus = 
actualUnusedSegmentsPlus.stream()
+        .collect(Collectors.toMap(d -> d.getDataSegment().getId(), 
Function.identity()));
+    Assert.assertTrue(expectedIdToSegment.entrySet().stream().allMatch(e -> {
+      DataSegmentPlus segmentPlus = actualIdToSegmentPlus.get(e.getKey());
+      return segmentPlus != null
+             && 
!segmentPlus.getCreatedDate().isAfter(usedStatusLastUpdatedTime)
+             && segmentPlus.getUsedStatusLastUpdatedDate() != null
+             && 
segmentPlus.getUsedStatusLastUpdatedDate().equals(usedStatusLastUpdatedTime);
+    }));
+  }
+
+  private void verifyEqualsAllSegmentsPlus(
+      List<DataSegment> expectedSegments,
+      List<DataSegmentPlus> actualUnusedSegmentsPlus,
+      DateTime usedStatusLastUpdatedTime
+  )
+  {
+    Assert.assertEquals(expectedSegments.size(), 
actualUnusedSegmentsPlus.size());
+    for (int i = 0; i < expectedSegments.size(); i++) {
+      DataSegment expectedSegment = expectedSegments.get(i);
+      DataSegmentPlus actualSegmentPlus = actualUnusedSegmentsPlus.get(i);
+      Assert.assertEquals(expectedSegment.getId(), 
actualSegmentPlus.getDataSegment().getId());
+      
Assert.assertTrue(!actualSegmentPlus.getCreatedDate().isAfter(usedStatusLastUpdatedTime)
+                        && actualSegmentPlus.getUsedStatusLastUpdatedDate() != 
null
+                        && 
actualSegmentPlus.getUsedStatusLastUpdatedDate().equals(usedStatusLastUpdatedTime));
+    }
+  }
+
   /**
    * This test-only shard type is to test the behavior of "old generation" 
tombstones with 1 core partition.
    */
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
index 1e2f2d462c4..92a40b9a443 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
@@ -25,6 +25,7 @@ import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.metadata.SegmentsMetadataManager;
 import org.apache.druid.metadata.SortOrder;
+import org.apache.druid.server.http.DataSegmentPlus;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.Partitions;
 import org.apache.druid.timeline.SegmentId;
@@ -194,7 +195,7 @@ public class TestSegmentsMetadataManager implements 
SegmentsMetadataManager
   }
 
   @Override
-  public Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
+  public Iterable<DataSegmentPlus> iterateAllUnusedSegmentsForDatasource(
       String datasource,
       @Nullable Interval interval,
       @Nullable Integer limit,
diff --git 
a/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java 
b/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java
new file mode 100644
index 00000000000..f2c9a68a8d5
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Map;
+
+public class DataSegmentPlusTest
+{
+  private static final ObjectMapper MAPPER = new DefaultObjectMapper();
+  private static final int TEST_VERSION = 0x9;
+
+  @Before
+  public void setUp()
+  {
+    InjectableValues.Std injectableValues = new InjectableValues.Std();
+    injectableValues.addValue(DataSegment.PruneSpecsHolder.class, 
DataSegment.PruneSpecsHolder.DEFAULT);
+    MAPPER.setInjectableValues(injectableValues);
+  }
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(DataSegmentPlus.class)
+        .withNonnullFields("dataSegment", "createdDate")
+        .usingGetClass()
+        .verify();
+  }
+
+  @Test
+  public void testSerde() throws JsonProcessingException
+  {
+    final Interval interval = Intervals.of("2011-10-01/2011-10-02");
+    final ImmutableMap<String, Object> loadSpec = ImmutableMap.of("something", 
"or_other");
+
+    String createdDateStr = "2024-01-20T00:00:00.701Z";
+    String usedStatusLastUpdatedDateStr = "2024-01-20T01:00:00.701Z";
+    DateTime createdDate = DateTimes.of(createdDateStr);
+    DateTime usedStatusLastUpdatedDate = 
DateTimes.of(usedStatusLastUpdatedDateStr);
+    DataSegmentPlus segmentPlus = new DataSegmentPlus(
+        new DataSegment(
+            "something",
+            interval,
+            "1",
+            loadSpec,
+            Arrays.asList("dim1", "dim2"),
+            Arrays.asList("met1", "met2"),
+            new NumberedShardSpec(3, 0),
+            new CompactionState(
+                new HashedPartitionsSpec(100000, null, 
ImmutableList.of("dim1")),
+                new DimensionsSpec(
+                    DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", 
"bar", "foo"))
+                ),
+                ImmutableList.of(ImmutableMap.of("type", "count", "name", 
"count")),
+                ImmutableMap.of("filter", ImmutableMap.of("type", "selector", 
"dimension", "dim1", "value", "foo")),
+                ImmutableMap.of(),
+                ImmutableMap.of()
+            ),
+            TEST_VERSION,
+            1
+        ),
+        createdDate,
+        usedStatusLastUpdatedDate
+    );
+
+    final Map<String, Object> objectMap = MAPPER.readValue(
+        MAPPER.writeValueAsString(segmentPlus),
+        JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+    );
+
+    Assert.assertEquals(3, objectMap.size());
+    final Map<String, Object> segmentObjectMap = MAPPER.readValue(
+        MAPPER.writeValueAsString(segmentPlus.getDataSegment()),
+        JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+    );
+
+    // verify dataSegment
+    Assert.assertEquals(11, segmentObjectMap.size());
+    Assert.assertEquals("something", segmentObjectMap.get("dataSource"));
+    Assert.assertEquals(interval.toString(), segmentObjectMap.get("interval"));
+    Assert.assertEquals("1", segmentObjectMap.get("version"));
+    Assert.assertEquals(loadSpec, segmentObjectMap.get("loadSpec"));
+    Assert.assertEquals("dim1,dim2", segmentObjectMap.get("dimensions"));
+    Assert.assertEquals("met1,met2", segmentObjectMap.get("metrics"));
+    Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3, 
"partitions", 0), segmentObjectMap.get("shardSpec"));
+    Assert.assertEquals(TEST_VERSION, segmentObjectMap.get("binaryVersion"));
+    Assert.assertEquals(1, segmentObjectMap.get("size"));
+    Assert.assertEquals(6, ((Map) 
segmentObjectMap.get("lastCompactionState")).size());
+
+    // verify extra metadata
+    Assert.assertEquals(createdDateStr, objectMap.get("createdDate"));
+    Assert.assertEquals(usedStatusLastUpdatedDateStr, 
objectMap.get("usedStatusLastUpdatedDate"));
+
+    DataSegmentPlus deserializedSegmentPlus = 
MAPPER.readValue(MAPPER.writeValueAsString(segmentPlus), DataSegmentPlus.class);
+
+    // verify dataSegment
+    Assert.assertEquals(segmentPlus.getDataSegment().getDataSource(), 
deserializedSegmentPlus.getDataSegment().getDataSource());
+    Assert.assertEquals(segmentPlus.getDataSegment().getInterval(), 
deserializedSegmentPlus.getDataSegment().getInterval());
+    Assert.assertEquals(segmentPlus.getDataSegment().getVersion(), 
deserializedSegmentPlus.getDataSegment().getVersion());
+    Assert.assertEquals(segmentPlus.getDataSegment().getLoadSpec(), 
deserializedSegmentPlus.getDataSegment().getLoadSpec());
+    Assert.assertEquals(segmentPlus.getDataSegment().getDimensions(), 
deserializedSegmentPlus.getDataSegment().getDimensions());
+    Assert.assertEquals(segmentPlus.getDataSegment().getMetrics(), 
deserializedSegmentPlus.getDataSegment().getMetrics());
+    Assert.assertEquals(segmentPlus.getDataSegment().getShardSpec(), 
deserializedSegmentPlus.getDataSegment().getShardSpec());
+    Assert.assertEquals(segmentPlus.getDataSegment().getSize(), 
deserializedSegmentPlus.getDataSegment().getSize());
+    Assert.assertEquals(segmentPlus.getDataSegment().getId(), 
deserializedSegmentPlus.getDataSegment().getId());
+    Assert.assertEquals(segmentPlus.getDataSegment().getLastCompactionState(), 
deserializedSegmentPlus.getDataSegment().getLastCompactionState());
+
+    // verify extra metadata
+    Assert.assertEquals(segmentPlus.getCreatedDate(), 
deserializedSegmentPlus.getCreatedDate());
+    Assert.assertEquals(segmentPlus.getUsedStatusLastUpdatedDate(), 
deserializedSegmentPlus.getUsedStatusLastUpdatedDate());
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java 
b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
index 8c818c918f7..fa99a411d36 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Comparators;
@@ -74,6 +75,10 @@ public class MetadataResourceTest
           .withNumPartitions(NUM_PARTITIONS)
           .eachOfSizeInMb(500)
           .toArray(new DataSegment[0]);
+
+  private final List<DataSegmentPlus> segmentsPlus = Arrays.stream(segments)
+          .map(s -> new DataSegmentPlus(s, DateTimes.nowUtc(), 
DateTimes.nowUtc()))
+          .collect(Collectors.toList());
   private HttpServletRequest request;
   private SegmentsMetadataManager segmentsMetadataManager;
   private IndexerMetadataStorageCoordinator storageCoordinator;
@@ -281,7 +286,7 @@ public class MetadataResourceTest
         null,
         null
     );
-    List<DataSegment> resultList = extractResponseList(response);
+    List<DataSegmentPlus> resultList = extractResponseList(response);
     Assert.assertTrue(resultList.isEmpty());
 
     // test valid datasource with bad limit - fails with expected invalid 
limit message
@@ -302,7 +307,7 @@ public class MetadataResourceTest
     response = metadataResource.getUnusedSegmentsInDataSource(request, 
DATASOURCE1, null, null, null, null);
 
     resultList = extractResponseList(response);
-    Assert.assertEquals(Arrays.asList(segments), resultList);
+    Assert.assertEquals(segmentsPlus, resultList);
 
     // test valid datasource with interval filter - returns all unused 
segments for that datasource within interval
     int numDays = 2;
@@ -311,7 +316,10 @@ public class MetadataResourceTest
 
     resultList = extractResponseList(response);
     Assert.assertEquals(NUM_PARTITIONS * numDays, resultList.size());
-    Assert.assertEquals(Arrays.asList(segments[0], segments[1], segments[2], 
segments[3]), resultList);
+    Assert.assertEquals(
+        Arrays.asList(segmentsPlus.get(0), segmentsPlus.get(1), 
segmentsPlus.get(2), segmentsPlus.get(3)),
+        resultList
+    );
 
     // test valid datasource with interval filter limit and last segment id - 
returns unused segments for that
     // datasource within interval upto limit starting at last segment id
@@ -320,7 +328,7 @@ public class MetadataResourceTest
 
     resultList = extractResponseList(response);
     Assert.assertEquals(limit, resultList.size());
-    Assert.assertEquals(Arrays.asList(segments[0], segments[1], segments[2]), 
resultList);
+    Assert.assertEquals(Arrays.asList(segmentsPlus.get(0), 
segmentsPlus.get(1), segmentsPlus.get(2)), resultList);
 
     // test valid datasource with interval filter limit and offset - returns 
unused segments for that datasource within
     // interval upto limit starting at offset
@@ -334,10 +342,10 @@ public class MetadataResourceTest
     );
 
     resultList = extractResponseList(response);
-    Assert.assertEquals(Collections.singletonList(segments[3]), resultList);
+    Assert.assertEquals(Collections.singletonList(segmentsPlus.get(3)), 
resultList);
   }
 
-  Answer<Iterable<DataSegment>> mockIterateAllUnusedSegmentsForDatasource()
+  Answer<Iterable<DataSegmentPlus>> mockIterateAllUnusedSegmentsForDatasource()
   {
     return invocationOnMock -> {
       String dataSourceName = invocationOnMock.getArgument(0);
@@ -349,16 +357,17 @@ public class MetadataResourceTest
         return ImmutableList.of();
       }
 
-      return Arrays.stream(segments)
-          .filter(d -> d.getDataSource().equals(dataSourceName)
+      return segmentsPlus.stream()
+          .filter(d -> 
d.getDataSegment().getDataSource().equals(dataSourceName)
                        && (interval == null
-                           || (d.getInterval().getStartMillis() >= 
interval.getStartMillis()
-                               && d.getInterval().getEndMillis() <= 
interval.getEndMillis()))
+                           || 
(d.getDataSegment().getInterval().getStartMillis() >= interval.getStartMillis()
+                               && 
d.getDataSegment().getInterval().getEndMillis() <= interval.getEndMillis()))
                        && (lastSegmentId == null
-                           || (sortOrder == null && 
d.getId().toString().compareTo(lastSegmentId) > 0)
-                           || (sortOrder == SortOrder.ASC && 
d.getId().toString().compareTo(lastSegmentId) > 0)
-                           || (sortOrder == SortOrder.DESC && 
d.getId().toString().compareTo(lastSegmentId) < 0)))
-          .sorted((o1, o2) -> 
Comparators.intervalsByStartThenEnd().compare(o1.getInterval(), 
o2.getInterval()))
+                           || (sortOrder == null && 
d.getDataSegment().getId().toString().compareTo(lastSegmentId) > 0)
+                           || (sortOrder == SortOrder.ASC && 
d.getDataSegment().getId().toString().compareTo(lastSegmentId) > 0)
+                           || (sortOrder == SortOrder.DESC && 
d.getDataSegment().getId().toString().compareTo(lastSegmentId) < 0)))
+          .sorted((o1, o2) -> Comparators.intervalsByStartThenEnd()
+              .compare(o1.getDataSegment().getInterval(), 
o2.getDataSegment().getInterval()))
           .limit(limit != null
               ? limit
               : segments.length)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to