This is an automated email from the ASF dual-hosted git repository.
zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ae6afc0751b Extend unused segment metadata api response to include
created date and last used updated time (#15738)
ae6afc0751b is described below
commit ae6afc0751b8912b7beb374ffca589e288b3ed9f
Author: zachjsh <[email protected]>
AuthorDate: Fri Jan 26 15:47:40 2024 -0500
Extend unused segment metadata api response to include created date and
last used updated time (#15738)
### Description
The unusedSegment api response was extended to include the original
DataSegment object with the creation time and last used update time added to
it. A new object `DataSegmentPlus` was created for this purpose, and the
metadata queries used were updated as needed.
example response:
```
[
{
"dataSegment": {
"dataSource": "inline_data",
"interval": "2023-01-02T00:00:00.000Z/2023-01-03T00:00:00.000Z",
"version": "2024-01-25T16:06:42.835Z",
"loadSpec": {
"type": "local",
"path":
"/Users/zachsherman/projects/opensrc-druid/distribution/target/apache-druid-30.0.0-SNAPSHOT/var/druid/segments/inline_data/2023-01-02T00:00:00.000Z_2023-01-03T00:00:00.000Z/2024-01-25T16:06:42.835Z/0/index/"
},
"dimensions": "str_dim,double_measure1,double_measure2",
"metrics": "",
"shardSpec": {
"type": "numbered",
"partitionNum": 0,
"partitions": 1
},
"binaryVersion": 9,
"size": 1402,
"identifier":
"inline_data_2023-01-02T00:00:00.000Z_2023-01-03T00:00:00.000Z_2024-01-25T16:06:42.835Z"
},
"createdDate": "2024-01-25T16:06:44.196Z",
"usedStatusLastUpdatedDate": "2024-01-25T16:07:34.909Z"
}
]
```
---
.../druid/metadata/SegmentsMetadataManager.java | 13 +-
.../druid/metadata/SqlSegmentsMetadataManager.java | 12 +-
.../druid/metadata/SqlSegmentsMetadataQuery.java | 211 ++++++++++++++++-
.../apache/druid/server/http/DataSegmentPlus.java | 113 +++++++++
.../apache/druid/server/http/MetadataResource.java | 10 +-
.../IndexerSQLMetadataStorageCoordinatorTest.java | 254 +++++++++++++++++++--
.../simulate/TestSegmentsMetadataManager.java | 3 +-
.../druid/server/http/DataSegmentPlusTest.java | 149 ++++++++++++
.../druid/server/http/MetadataResourceTest.java | 37 +--
9 files changed, 748 insertions(+), 54 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
index 94c2fae60fd..611c19f8462 100644
---
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
@@ -126,11 +127,11 @@ public interface SegmentsMetadataManager
);
/**
- * Returns an iterable to go over un-used segments for a given datasource
over an optional interval.
- * The order in which segments are iterated is from earliest start-time,
with ties being broken with earliest end-time
- * first. Note: the iteration may not be as trivially cheap as,
- * for example, iteration over an ArrayList. Try (to some reasonable extent)
to organize the code so that it
- * iterates the returned iterable only once rather than several times.
+ * Returns an iterable to go over un-used segments and their associated
metadata for a given datasource over an
+ * optional interval. The order in which segments are iterated is from
earliest start-time, with ties being broken
+ * with earliest end-time first. Note: the iteration may not be as trivially
cheap as for example, iteration over an
+ * ArrayList. Try (to some reasonable extent) to organize the code so that
it iterates the returned iterable only
+ * once rather than several times.
*
* @param datasource the name of the datasource.
* @param interval an optional interval to search over. If none is
specified, {@link org.apache.druid.java.util.common.Intervals#ETERNITY}
@@ -141,7 +142,7 @@ public interface SegmentsMetadataManager
* @param sortOrder an optional order with which to return the matching
segments by id, start time, end time.
* If none is specified, the order of the results is
not guarenteed.
*/
- Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
+ Iterable<DataSegmentPlus> iterateAllUnusedSegmentsForDatasource(
String datasource,
@Nullable Interval interval,
@Nullable Integer limit,
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 0b423006152..88dd26c03d0 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -47,6 +47,7 @@ import
org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentId;
@@ -957,8 +958,9 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
}
/**
- * Retrieves segments for a given datasource that are marked unused and that
are *fully contained by* an optionally
- * specified interval. If the interval specified is null, this method will
retrieve all unused segments.
+ * Retrieves segments and their associated metadata for a given datasource
that are marked unused and that are
+ * *fully contained by* an optionally specified interval. If the interval
specified is null, this method will
+ * retrieve all unused segments.
*
* This call does not return any information about realtime segments.
*
@@ -976,7 +978,7 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
* Returns an iterable.
*/
@Override
- public Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
+ public Iterable<DataSegmentPlus> iterateAllUnusedSegmentsForDatasource(
final String datasource,
@Nullable final Interval interval,
@Nullable final Integer limit,
@@ -993,8 +995,8 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
interval == null
? Intervals.ONLY_ETERNITY
: Collections.singletonList(interval);
- try (final CloseableIterator<DataSegment> iterator =
- queryTool.retrieveUnusedSegments(datasource, intervals,
limit, lastSegmentId, sortOrder, null)) {
+ try (final CloseableIterator<DataSegmentPlus> iterator =
+ queryTool.retrieveUnusedSegmentsPlus(datasource, intervals,
limit, lastSegmentId, sortOrder, null)) {
return ImmutableList.copyOf(iterator);
}
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 3bd7c48ad05..adbfb0fda23 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
@@ -173,6 +174,49 @@ public class SqlSegmentsMetadataQuery
);
}
+ /**
+ * Similar to {@link #retrieveUnusedSegments}, but also retrieves associated
metadata for the segments for a given
+ * datasource that are marked unused and that are *fully contained by* any
interval in a particular collection of
+ * intervals. If the collection of intervals is empty, this method will
retrieve all unused segments.
+ *
+ * This call does not return any information about realtime segments.
+ *
+ * @param dataSource The name of the datasource
+ * @param intervals The intervals to search over
+ * @param limit The limit of segments to return
+ * @param lastSegmentId the last segment id from which to search for
results. All segments returned are >
+ * this segment lexigraphically if sortOrder is null or
ASC, or < this segment
+ * lexigraphically if sortOrder is DESC.
+ * @param sortOrder Specifies the order with which to return the
matching segments by start time, end time.
+ * A null value indicates that order does not matter.
+ * @param maxUsedStatusLastUpdatedTime The maximum {@code
used_status_last_updated} time. Any unused segment in {@code intervals}
+ * with {@code used_status_last_updated}
no later than this time will be included in the
+ * iterator. Segments without {@code
used_status_last_updated} time (due to an upgrade
+ * from legacy Druid) will have {@code
maxUsedStatusLastUpdatedTime} ignored
+
+ * Returns a closeable iterator. You should close it when you are done.
+ */
+ public CloseableIterator<DataSegmentPlus> retrieveUnusedSegmentsPlus(
+ final String dataSource,
+ final Collection<Interval> intervals,
+ @Nullable final Integer limit,
+ @Nullable final String lastSegmentId,
+ @Nullable final SortOrder sortOrder,
+ @Nullable final DateTime maxUsedStatusLastUpdatedTime
+ )
+ {
+ return retrieveSegmentsPlus(
+ dataSource,
+ intervals,
+ IntervalMode.CONTAINS,
+ false,
+ limit,
+ lastSegmentId,
+ sortOrder,
+ maxUsedStatusLastUpdatedTime
+ );
+ }
+
/**
* Marks the provided segments as either used or unused.
*
@@ -445,6 +489,54 @@ public class SqlSegmentsMetadataQuery
}
}
+ private CloseableIterator<DataSegmentPlus> retrieveSegmentsPlus(
+ final String dataSource,
+ final Collection<Interval> intervals,
+ final IntervalMode matchMode,
+ final boolean used,
+ @Nullable final Integer limit,
+ @Nullable final String lastSegmentId,
+ @Nullable final SortOrder sortOrder,
+ @Nullable final DateTime maxUsedStatusLastUpdatedTime
+ )
+ {
+ if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) {
+ return CloseableIterators.withEmptyBaggage(
+ retrieveSegmentsPlusInIntervalsBatch(dataSource, intervals,
matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)
+ );
+ } else {
+ final List<List<Interval>> intervalsLists = Lists.partition(new
ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
+ final List<Iterator<DataSegmentPlus>> resultingIterators = new
ArrayList<>();
+ Integer limitPerBatch = limit;
+
+ for (final List<Interval> intervalList : intervalsLists) {
+ final UnmodifiableIterator<DataSegmentPlus> iterator =
retrieveSegmentsPlusInIntervalsBatch(
+ dataSource,
+ intervalList,
+ matchMode,
+ used,
+ limitPerBatch,
+ lastSegmentId,
+ sortOrder,
+ maxUsedStatusLastUpdatedTime
+ );
+ if (limitPerBatch != null) {
+ // If limit is provided, we need to shrink the limit for subsequent
batches or circuit break if
+ // we have reached what was requested for.
+ final List<DataSegmentPlus> dataSegments =
ImmutableList.copyOf(iterator);
+ resultingIterators.add(dataSegments.iterator());
+ if (dataSegments.size() >= limitPerBatch) {
+ break;
+ }
+ limitPerBatch -= dataSegments.size();
+ } else {
+ resultingIterators.add(iterator);
+ }
+ }
+ return
CloseableIterators.withEmptyBaggage(Iterators.concat(resultingIterators.iterator()));
+ }
+ }
+
private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
final String dataSource,
final Collection<Interval> intervals,
@@ -455,12 +547,73 @@ public class SqlSegmentsMetadataQuery
@Nullable final SortOrder sortOrder,
@Nullable final DateTime maxUsedStatusLastUpdatedTime
)
+ {
+ final Query<Map<String, Object>> sql = buildSegmentsTableQuery(
+ dataSource,
+ intervals,
+ matchMode,
+ used,
+ limit,
+ lastSegmentId,
+ sortOrder,
+ maxUsedStatusLastUpdatedTime,
+ false
+ );
+
+ final ResultIterator<DataSegment> resultIterator =
getDataSegmentResultIterator(sql);
+
+ return filterDataSegmentIteratorByInterval(resultIterator, intervals,
matchMode);
+ }
+
+ private UnmodifiableIterator<DataSegmentPlus>
retrieveSegmentsPlusInIntervalsBatch(
+ final String dataSource,
+ final Collection<Interval> intervals,
+ final IntervalMode matchMode,
+ final boolean used,
+ @Nullable final Integer limit,
+ @Nullable final String lastSegmentId,
+ @Nullable final SortOrder sortOrder,
+ @Nullable final DateTime maxUsedStatusLastUpdatedTime
+ )
+ {
+
+ final Query<Map<String, Object>> sql = buildSegmentsTableQuery(
+ dataSource,
+ intervals,
+ matchMode,
+ used,
+ limit,
+ lastSegmentId,
+ sortOrder,
+ maxUsedStatusLastUpdatedTime,
+ true
+ );
+
+ final ResultIterator<DataSegmentPlus> resultIterator =
getDataSegmentPlusResultIterator(sql);
+
+ return filterDataSegmentPlusIteratorByInterval(resultIterator, intervals,
matchMode);
+ }
+
+ private Query<Map<String, Object>> buildSegmentsTableQuery(
+ final String dataSource,
+ final Collection<Interval> intervals,
+ final IntervalMode matchMode,
+ final boolean used,
+ @Nullable final Integer limit,
+ @Nullable final String lastSegmentId,
+ @Nullable final SortOrder sortOrder,
+ @Nullable final DateTime maxUsedStatusLastUpdatedTime,
+ final boolean includeExtraInfo
+ )
{
// Check if the intervals all support comparing as strings. If so, bake
them into the SQL.
final boolean compareAsString =
intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings);
-
final StringBuilder sb = new StringBuilder();
- sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource =
:dataSource");
+ if (includeExtraInfo) {
+ sb.append("SELECT payload, created_date, used_status_last_updated FROM
%s WHERE used = :used AND dataSource = :dataSource");
+ } else {
+ sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource =
:dataSource");
+ }
if (compareAsString) {
appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode,
connector);
@@ -513,10 +666,31 @@ public class SqlSegmentsMetadataQuery
bindQueryIntervals(sql, intervals);
}
- final ResultIterator<DataSegment> resultIterator =
- sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper,
r.getBytes(1), DataSegment.class))
- .iterator();
+ return sql;
+ }
+
+ private ResultIterator<DataSegment>
getDataSegmentResultIterator(Query<Map<String, Object>> sql)
+ {
+ return sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper,
r.getBytes(1), DataSegment.class))
+ .iterator();
+ }
+ private ResultIterator<DataSegmentPlus>
getDataSegmentPlusResultIterator(Query<Map<String, Object>> sql)
+ {
+ return sql.map((index, r, ctx) -> new DataSegmentPlus(
+ JacksonUtils.readValue(jsonMapper, r.getBytes(1),
DataSegment.class),
+ DateTimes.of(r.getString(2)),
+ DateTimes.of(r.getString(3))
+ ))
+ .iterator();
+ }
+
+ private UnmodifiableIterator<DataSegment>
filterDataSegmentIteratorByInterval(
+ ResultIterator<DataSegment> resultIterator,
+ final Collection<Interval> intervals,
+ final IntervalMode matchMode
+ )
+ {
return Iterators.filter(
resultIterator,
dataSegment -> {
@@ -538,6 +712,33 @@ public class SqlSegmentsMetadataQuery
);
}
+ private UnmodifiableIterator<DataSegmentPlus>
filterDataSegmentPlusIteratorByInterval(
+ ResultIterator<DataSegmentPlus> resultIterator,
+ final Collection<Interval> intervals,
+ final IntervalMode matchMode
+ )
+ {
+ return Iterators.filter(
+ resultIterator,
+ dataSegment -> {
+ if (intervals.isEmpty()) {
+ return true;
+ } else {
+ // Must re-check that the interval matches, even if comparing as
string, because the *segment interval*
+ // might not be string-comparable. (Consider a query interval like
"2000-01-01/3000-01-01" and a
+ // segment interval like "20010/20011".)
+ for (Interval interval : intervals) {
+ if (matchMode.apply(interval,
dataSegment.getDataSegment().getInterval())) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+ }
+ );
+ }
+
private static int computeNumChangedSegments(List<String> segmentIds, int[]
segmentChanges)
{
int numChangedSegments = 0;
diff --git
a/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java
b/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java
new file mode 100644
index 00000000000..2a0d7fc05ea
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.guice.annotations.UnstableApi;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Encapsulates a {@link DataSegment} and additional metadata about it:
+ * {@link DataSegmentPlus#createdDate}: The time when the
segment was created </li>
+ * {@link DataSegmentPlus#usedStatusLastUpdatedDate}: The time when the
segments used status was last updated </li>
+ * <p>
+ * The class closesly resembles the row structure of the {@link
MetadataStorageTablesConfig#getSegmentsTable()}
+ * </p>
+ */
+@UnstableApi
+public class DataSegmentPlus
+{
+ private final DataSegment dataSegment;
+ private final DateTime createdDate;
+ @Nullable
+ private final DateTime usedStatusLastUpdatedDate;
+
+ @JsonCreator
+ public DataSegmentPlus(
+ @JsonProperty("dataSegment") final DataSegment dataSegment,
+ @JsonProperty("createdDate") final DateTime createdDate,
+ @JsonProperty("usedStatusLastUpdatedDate") @Nullable final DateTime
usedStatusLastUpdatedDate
+ )
+ {
+ this.dataSegment = dataSegment;
+ this.createdDate = createdDate;
+ this.usedStatusLastUpdatedDate = usedStatusLastUpdatedDate;
+ }
+
+ @JsonProperty
+ public DateTime getCreatedDate()
+ {
+ return createdDate;
+ }
+
+ @Nullable
+ @JsonProperty
+ public DateTime getUsedStatusLastUpdatedDate()
+ {
+ return usedStatusLastUpdatedDate;
+ }
+
+ @JsonProperty
+ public DataSegment getDataSegment()
+ {
+ return dataSegment;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DataSegmentPlus that = (DataSegmentPlus) o;
+ return Objects.equals(dataSegment, that.getDataSegment())
+ && Objects.equals(createdDate, that.getCreatedDate())
+ && Objects.equals(usedStatusLastUpdatedDate,
that.getUsedStatusLastUpdatedDate());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(
+ dataSegment,
+ createdDate,
+ usedStatusLastUpdatedDate
+ );
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DataSegmentPlus{" +
+ "createdDate=" + getCreatedDate() +
+ ", usedStatusLastUpdatedDate=" + getUsedStatusLastUpdatedDate() +
+ ", dataSegment=" + getDataSegment() +
+ '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
index fb976a04bf3..c53a998e238 100644
--- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
@@ -363,7 +363,7 @@ public class MetadataResource
SortOrder theSortOrder = sortOrder == null ? null :
SortOrder.fromValue(sortOrder);
final Interval theInterval = interval != null ?
Intervals.of(interval.replace('_', '/')) : null;
- Iterable<DataSegment> unusedSegments =
segmentsMetadataManager.iterateAllUnusedSegmentsForDatasource(
+ Iterable<DataSegmentPlus> unusedSegments =
segmentsMetadataManager.iterateAllUnusedSegmentsForDatasource(
dataSource,
theInterval,
limit,
@@ -371,13 +371,13 @@ public class MetadataResource
theSortOrder
);
- final Function<DataSegment, Iterable<ResourceAction>> raGenerator =
segment -> Collections.singletonList(
-
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
+ final Function<DataSegmentPlus, Iterable<ResourceAction>> raGenerator =
segment -> Collections.singletonList(
+
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource()));
- final Iterable<DataSegment> authorizedSegments =
+ final Iterable<DataSegmentPlus> authorizedSegments =
AuthorizationUtils.filterAuthorizedResources(req, unusedSegments,
raGenerator, authorizerMapper);
- final List<DataSegment> retVal = new ArrayList<>();
+ final List<DataSegmentPlus> retVal = new ArrayList<>();
authorizedSegments.iterator().forEachRemaining(retVal::add);
return Response.status(Response.Status.OK).entity(retVal).build();
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 0626792f1fd..da7bf422692 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -42,7 +42,9 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
@@ -84,6 +86,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
import java.util.stream.Collectors;
public class IndexerSQLMetadataStorageCoordinatorTest
@@ -1240,7 +1243,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit()
throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+ DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+ markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
@@ -1251,13 +1255,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(segments.containsAll(actualUnusedSegments));
+
+ final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus =
retrieveUnusedSegmentsPlus(
+
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+ null,
+ null,
+ null,
+ null
+ );
+ Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus.size());
+ verifyContainsAllSegmentsPlus(segments, actualUnusedSegmentsPlus,
usedStatusLastUpdatedTime);
}
@Test
public void
testRetrieveUnusedSegmentsUsingNoIntervalsNoLimitAndNoLastSegmentId() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+ DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+ markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
ImmutableList.of(),
@@ -1268,13 +1283,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(segments.containsAll(actualUnusedSegments));
+
+ final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus =
retrieveUnusedSegmentsPlus(
+ ImmutableList.of(),
+ null,
+ null,
+ null,
+ null
+ );
+ Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus.size());
+ verifyContainsAllSegmentsPlus(segments, actualUnusedSegmentsPlus,
usedStatusLastUpdatedTime);
}
@Test
public void
testRetrieveUnusedSegmentsUsingNoIntervalsAndNoLimitAndNoLastSegmentId() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(2033,
2133);
- markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+ DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+ markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
String lastSegmentId = segments.get(9).getId().toString();
final List<DataSegment> expectedSegmentsAscOrder = segments.stream()
@@ -1290,6 +1316,16 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals(expectedSegmentsAscOrder.size(),
actualUnusedSegments.size());
Assert.assertTrue(expectedSegmentsAscOrder.containsAll(actualUnusedSegments));
+ ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus =
retrieveUnusedSegmentsPlus(
+ ImmutableList.of(),
+ null,
+ lastSegmentId,
+ null,
+ null
+ );
+ Assert.assertEquals(expectedSegmentsAscOrder.size(),
actualUnusedSegmentsPlus.size());
+ verifyContainsAllSegmentsPlus(expectedSegmentsAscOrder,
actualUnusedSegmentsPlus, usedStatusLastUpdatedTime);
+
actualUnusedSegments = retrieveUnusedSegments(
ImmutableList.of(),
null,
@@ -1300,6 +1336,16 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals(expectedSegmentsAscOrder.size(),
actualUnusedSegments.size());
Assert.assertEquals(expectedSegmentsAscOrder, actualUnusedSegments);
+ actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus(
+ ImmutableList.of(),
+ null,
+ lastSegmentId,
+ SortOrder.ASC,
+ null
+ );
+ Assert.assertEquals(expectedSegmentsAscOrder.size(),
actualUnusedSegmentsPlus.size());
+ verifyEqualsAllSegmentsPlus(expectedSegmentsAscOrder,
actualUnusedSegmentsPlus, usedStatusLastUpdatedTime);
+
final List<DataSegment> expectedSegmentsDescOrder = segments.stream()
.filter(s -> s.getId().toString().compareTo(lastSegmentId) < 0)
.collect(Collectors.toList());
@@ -1314,13 +1360,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
Assert.assertEquals(expectedSegmentsDescOrder.size(),
actualUnusedSegments.size());
Assert.assertEquals(expectedSegmentsDescOrder, actualUnusedSegments);
+
+ actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus(
+ ImmutableList.of(),
+ null,
+ lastSegmentId,
+ SortOrder.DESC,
+ null
+ );
+ Assert.assertEquals(expectedSegmentsDescOrder.size(),
actualUnusedSegmentsPlus.size());
+ verifyEqualsAllSegmentsPlus(expectedSegmentsDescOrder,
actualUnusedSegmentsPlus, usedStatusLastUpdatedTime);
}
@Test
public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+ DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+ markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
@@ -1331,13 +1388,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(segments.containsAll(actualUnusedSegments));
+
+ final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus =
retrieveUnusedSegmentsPlus(
+ ImmutableList.of(),
+ segments.size(),
+ null,
+ null,
+ null
+ );
+ Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus.size());
+ verifyContainsAllSegmentsPlus(segments, actualUnusedSegmentsPlus,
usedStatusLastUpdatedTime);
}
@Test
public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+ DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+ markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
final int requestedLimit = segments.size() - 1;
final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
@@ -1347,18 +1415,34 @@ public class IndexerSQLMetadataStorageCoordinatorTest
null,
null
);
+ final List<DataSegment> expectedSegments =
segments.stream().limit(requestedLimit).collect(Collectors.toList());
Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
-
Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList())));
+ Assert.assertTrue(actualUnusedSegments.containsAll(expectedSegments));
+
+ final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus =
retrieveUnusedSegmentsPlus(
+ ImmutableList.of(),
+ requestedLimit,
+ null,
+ null,
+ null
+ );
+ Assert.assertEquals(requestedLimit, actualUnusedSegmentsPlus.size());
+ verifyContainsAllSegmentsPlus(expectedSegments, actualUnusedSegmentsPlus,
usedStatusLastUpdatedTime);
}
@Test
public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsInSingleBatchLimitAndLastSegmentId()
throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(2034,
2133);
- markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+ DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+ markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
final int requestedLimit = segments.size();
final String lastSegmentId = segments.get(4).getId().toString();
+ final List<DataSegment> expectedSegments = segments.stream()
+ .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0)
+ .limit(requestedLimit)
+ .collect(Collectors.toList());
final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
requestedLimit,
@@ -1367,20 +1451,32 @@ public class IndexerSQLMetadataStorageCoordinatorTest
null
);
Assert.assertEquals(segments.size() - 5, actualUnusedSegments.size());
- Assert.assertEquals(actualUnusedSegments, segments.stream()
- .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0)
- .limit(requestedLimit)
- .collect(Collectors.toList()));
+ Assert.assertEquals(actualUnusedSegments, expectedSegments);
+
+ final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus =
retrieveUnusedSegmentsPlus(
+ ImmutableList.of(),
+ requestedLimit,
+ lastSegmentId,
+ null,
+ null
+ );
+ Assert.assertEquals(segments.size() - 5, actualUnusedSegmentsPlus.size());
+ verifyEqualsAllSegmentsPlus(expectedSegments, actualUnusedSegmentsPlus,
usedStatusLastUpdatedTime);
}
@Test
public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsLimitAndLastSegmentId() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+ DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+ markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
final int requestedLimit = segments.size() - 1;
final String lastSegmentId = segments.get(4).getId().toString();
+ final List<DataSegment> expectedSegments = segments.stream()
+ .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0)
+ .limit(requestedLimit)
+ .collect(Collectors.toList());
final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
requestedLimit,
@@ -1389,17 +1485,25 @@ public class IndexerSQLMetadataStorageCoordinatorTest
null
);
Assert.assertEquals(requestedLimit - 4, actualUnusedSegments.size());
- Assert.assertEquals(actualUnusedSegments, segments.stream()
- .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0)
- .limit(requestedLimit)
- .collect(Collectors.toList()));
+ Assert.assertEquals(actualUnusedSegments, expectedSegments);
+
+ final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus =
retrieveUnusedSegmentsPlus(
+
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+ requestedLimit,
+ lastSegmentId,
+ null,
+ null
+ );
+ Assert.assertEquals(requestedLimit - 4, actualUnusedSegmentsPlus.size());
+ verifyEqualsAllSegmentsPlus(expectedSegments, actualUnusedSegmentsPlus,
usedStatusLastUpdatedTime);
}
@Test
public void testRetrieveUnusedSegmentsUsingMultipleIntervals() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+ DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+ markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
@@ -1410,6 +1514,16 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments));
+
+ final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus =
retrieveUnusedSegmentsPlus(
+
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+ segments.size() + 1,
+ null,
+ null,
+ null
+ );
+ Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus.size());
+ verifyContainsAllSegmentsPlus(segments, actualUnusedSegmentsPlus,
usedStatusLastUpdatedTime);
}
@Test
@@ -1430,13 +1544,23 @@ public class IndexerSQLMetadataStorageCoordinatorTest
null
);
Assert.assertEquals(0, actualUnusedSegments.size());
+
+ final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus =
retrieveUnusedSegmentsPlus(
+ ImmutableList.of(outOfRangeInterval),
+ null,
+ null,
+ null,
+ null
+ );
+ Assert.assertEquals(0, actualUnusedSegmentsPlus.size());
}
@Test
public void testRetrieveUnusedSegmentsWithMaxUsedStatusLastUpdatedTime()
throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1905,
1910);
- markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+ DateTime usedStatusLastUpdatedTime = DateTimes.nowUtc();
+ markAllSegmentsUnused(new HashSet<>(segments), usedStatusLastUpdatedTime);
final Interval interval = Intervals.of("1905/1920");
@@ -1449,6 +1573,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
Assert.assertEquals(5, actualUnusedSegments1.size());
+ ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus =
retrieveUnusedSegmentsPlus(
+ ImmutableList.of(interval),
+ null,
+ null,
+ null,
+ DateTimes.nowUtc()
+ );
+ Assert.assertEquals(5, actualUnusedSegmentsPlus.size());
+
final ImmutableList<DataSegment> actualUnusedSegments2 =
retrieveUnusedSegments(
ImmutableList.of(interval),
null,
@@ -1457,6 +1590,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest
DateTimes.nowUtc().minusHours(1)
);
Assert.assertEquals(0, actualUnusedSegments2.size());
+
+ actualUnusedSegmentsPlus = retrieveUnusedSegmentsPlus(
+ ImmutableList.of(interval),
+ null,
+ null,
+ null,
+ DateTimes.nowUtc().minusHours(1)
+ );
+ Assert.assertEquals(0, actualUnusedSegmentsPlus.size());
}
@Test
@@ -1492,6 +1634,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
Assert.assertEquals(oddYearSegments.size(), actualUnusedSegments1.size());
+ final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus1 =
retrieveUnusedSegmentsPlus(
+ ImmutableList.of(interval),
+ null,
+ null,
+ null,
+ maxUsedStatusLastUpdatedTime1
+ );
+ Assert.assertEquals(oddYearSegments.size(),
actualUnusedSegmentsPlus1.size());
+
final ImmutableList<DataSegment> actualUnusedSegments2 =
retrieveUnusedSegments(
ImmutableList.of(interval),
null,
@@ -1500,6 +1651,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest
maxUsedStatusLastUpdatedTime2
);
Assert.assertEquals(segments.size(), actualUnusedSegments2.size());
+
+ final ImmutableList<DataSegmentPlus> actualUnusedSegmentsPlus2 =
retrieveUnusedSegmentsPlus(
+ ImmutableList.of(interval),
+ null,
+ null,
+ null,
+ maxUsedStatusLastUpdatedTime2
+ );
+ Assert.assertEquals(segments.size(), actualUnusedSegmentsPlus2.size());
}
@Test
@@ -3343,6 +3503,64 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
+ private ImmutableList<DataSegmentPlus> retrieveUnusedSegmentsPlus(
+ final List<Interval> intervals,
+ final Integer limit,
+ final String lastSegmentId,
+ final SortOrder sortOrder,
+ final DateTime maxUsedStatusLastUpdatedTime
+ )
+ {
+ return derbyConnector.inReadOnlyTransaction(
+ (handle, status) -> {
+ try (final CloseableIterator<DataSegmentPlus> iterator =
+ SqlSegmentsMetadataQuery.forHandle(
+ handle,
+ derbyConnector,
+
derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ mapper
+ )
+ .retrieveUnusedSegmentsPlus(DS.WIKI, intervals, limit,
lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) {
+ return ImmutableList.copyOf(iterator);
+ }
+ }
+ );
+ }
+
+ private void verifyContainsAllSegmentsPlus(
+ List<DataSegment> expectedSegments,
+ List<DataSegmentPlus> actualUnusedSegmentsPlus,
+ DateTime usedStatusLastUpdatedTime)
+ {
+ Map<SegmentId, DataSegment> expectedIdToSegment =
expectedSegments.stream().collect(Collectors.toMap(DataSegment::getId,
Function.identity()));
+ Map<SegmentId, DataSegmentPlus> actualIdToSegmentPlus =
actualUnusedSegmentsPlus.stream()
+ .collect(Collectors.toMap(d -> d.getDataSegment().getId(),
Function.identity()));
+ Assert.assertTrue(expectedIdToSegment.entrySet().stream().allMatch(e -> {
+ DataSegmentPlus segmentPlus = actualIdToSegmentPlus.get(e.getKey());
+ return segmentPlus != null
+ &&
!segmentPlus.getCreatedDate().isAfter(usedStatusLastUpdatedTime)
+ && segmentPlus.getUsedStatusLastUpdatedDate() != null
+ &&
segmentPlus.getUsedStatusLastUpdatedDate().equals(usedStatusLastUpdatedTime);
+ }));
+ }
+
+ private void verifyEqualsAllSegmentsPlus(
+ List<DataSegment> expectedSegments,
+ List<DataSegmentPlus> actualUnusedSegmentsPlus,
+ DateTime usedStatusLastUpdatedTime
+ )
+ {
+ Assert.assertEquals(expectedSegments.size(),
actualUnusedSegmentsPlus.size());
+ for (int i = 0; i < expectedSegments.size(); i++) {
+ DataSegment expectedSegment = expectedSegments.get(i);
+ DataSegmentPlus actualSegmentPlus = actualUnusedSegmentsPlus.get(i);
+ Assert.assertEquals(expectedSegment.getId(),
actualSegmentPlus.getDataSegment().getId());
+
Assert.assertTrue(!actualSegmentPlus.getCreatedDate().isAfter(usedStatusLastUpdatedTime)
+ && actualSegmentPlus.getUsedStatusLastUpdatedDate() !=
null
+ &&
actualSegmentPlus.getUsedStatusLastUpdatedDate().equals(usedStatusLastUpdatedTime));
+ }
+ }
+
/**
* This test-only shard type is to test the behavior of "old generation"
tombstones with 1 core partition.
*/
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
index 1e2f2d462c4..92a40b9a443 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
@@ -25,6 +25,7 @@ import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.metadata.SortOrder;
+import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentId;
@@ -194,7 +195,7 @@ public class TestSegmentsMetadataManager implements
SegmentsMetadataManager
}
@Override
- public Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
+ public Iterable<DataSegmentPlus> iterateAllUnusedSegmentsForDatasource(
String datasource,
@Nullable Interval interval,
@Nullable Integer limit,
diff --git
a/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java
b/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java
new file mode 100644
index 00000000000..f2c9a68a8d5
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Map;
+
+public class DataSegmentPlusTest
+{
+ private static final ObjectMapper MAPPER = new DefaultObjectMapper();
+ private static final int TEST_VERSION = 0x9;
+
+ @Before
+ public void setUp()
+ {
+ InjectableValues.Std injectableValues = new InjectableValues.Std();
+ injectableValues.addValue(DataSegment.PruneSpecsHolder.class,
DataSegment.PruneSpecsHolder.DEFAULT);
+ MAPPER.setInjectableValues(injectableValues);
+ }
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(DataSegmentPlus.class)
+ .withNonnullFields("dataSegment", "createdDate")
+ .usingGetClass()
+ .verify();
+ }
+
+ @Test
+ public void testSerde() throws JsonProcessingException
+ {
+ final Interval interval = Intervals.of("2011-10-01/2011-10-02");
+ final ImmutableMap<String, Object> loadSpec = ImmutableMap.of("something",
"or_other");
+
+ String createdDateStr = "2024-01-20T00:00:00.701Z";
+ String usedStatusLastUpdatedDateStr = "2024-01-20T01:00:00.701Z";
+ DateTime createdDate = DateTimes.of(createdDateStr);
+ DateTime usedStatusLastUpdatedDate =
DateTimes.of(usedStatusLastUpdatedDateStr);
+ DataSegmentPlus segmentPlus = new DataSegmentPlus(
+ new DataSegment(
+ "something",
+ interval,
+ "1",
+ loadSpec,
+ Arrays.asList("dim1", "dim2"),
+ Arrays.asList("met1", "met2"),
+ new NumberedShardSpec(3, 0),
+ new CompactionState(
+ new HashedPartitionsSpec(100000, null,
ImmutableList.of("dim1")),
+ new DimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1",
"bar", "foo"))
+ ),
+ ImmutableList.of(ImmutableMap.of("type", "count", "name",
"count")),
+ ImmutableMap.of("filter", ImmutableMap.of("type", "selector",
"dimension", "dim1", "value", "foo")),
+ ImmutableMap.of(),
+ ImmutableMap.of()
+ ),
+ TEST_VERSION,
+ 1
+ ),
+ createdDate,
+ usedStatusLastUpdatedDate
+ );
+
+ final Map<String, Object> objectMap = MAPPER.readValue(
+ MAPPER.writeValueAsString(segmentPlus),
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ );
+
+ Assert.assertEquals(3, objectMap.size());
+ final Map<String, Object> segmentObjectMap = MAPPER.readValue(
+ MAPPER.writeValueAsString(segmentPlus.getDataSegment()),
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ );
+
+ // verify dataSegment
+ Assert.assertEquals(11, segmentObjectMap.size());
+ Assert.assertEquals("something", segmentObjectMap.get("dataSource"));
+ Assert.assertEquals(interval.toString(), segmentObjectMap.get("interval"));
+ Assert.assertEquals("1", segmentObjectMap.get("version"));
+ Assert.assertEquals(loadSpec, segmentObjectMap.get("loadSpec"));
+ Assert.assertEquals("dim1,dim2", segmentObjectMap.get("dimensions"));
+ Assert.assertEquals("met1,met2", segmentObjectMap.get("metrics"));
+ Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3,
"partitions", 0), segmentObjectMap.get("shardSpec"));
+ Assert.assertEquals(TEST_VERSION, segmentObjectMap.get("binaryVersion"));
+ Assert.assertEquals(1, segmentObjectMap.get("size"));
+ Assert.assertEquals(6, ((Map)
segmentObjectMap.get("lastCompactionState")).size());
+
+ // verify extra metadata
+ Assert.assertEquals(createdDateStr, objectMap.get("createdDate"));
+ Assert.assertEquals(usedStatusLastUpdatedDateStr,
objectMap.get("usedStatusLastUpdatedDate"));
+
+ DataSegmentPlus deserializedSegmentPlus =
MAPPER.readValue(MAPPER.writeValueAsString(segmentPlus), DataSegmentPlus.class);
+
+ // verify dataSegment
+ Assert.assertEquals(segmentPlus.getDataSegment().getDataSource(),
deserializedSegmentPlus.getDataSegment().getDataSource());
+ Assert.assertEquals(segmentPlus.getDataSegment().getInterval(),
deserializedSegmentPlus.getDataSegment().getInterval());
+ Assert.assertEquals(segmentPlus.getDataSegment().getVersion(),
deserializedSegmentPlus.getDataSegment().getVersion());
+ Assert.assertEquals(segmentPlus.getDataSegment().getLoadSpec(),
deserializedSegmentPlus.getDataSegment().getLoadSpec());
+ Assert.assertEquals(segmentPlus.getDataSegment().getDimensions(),
deserializedSegmentPlus.getDataSegment().getDimensions());
+ Assert.assertEquals(segmentPlus.getDataSegment().getMetrics(),
deserializedSegmentPlus.getDataSegment().getMetrics());
+ Assert.assertEquals(segmentPlus.getDataSegment().getShardSpec(),
deserializedSegmentPlus.getDataSegment().getShardSpec());
+ Assert.assertEquals(segmentPlus.getDataSegment().getSize(),
deserializedSegmentPlus.getDataSegment().getSize());
+ Assert.assertEquals(segmentPlus.getDataSegment().getId(),
deserializedSegmentPlus.getDataSegment().getId());
+ Assert.assertEquals(segmentPlus.getDataSegment().getLastCompactionState(),
deserializedSegmentPlus.getDataSegment().getLastCompactionState());
+
+ // verify extra metadata
+ Assert.assertEquals(segmentPlus.getCreatedDate(),
deserializedSegmentPlus.getCreatedDate());
+ Assert.assertEquals(segmentPlus.getUsedStatusLastUpdatedDate(),
deserializedSegmentPlus.getUsedStatusLastUpdatedDate());
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
index 8c818c918f7..fa99a411d36 100644
---
a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
@@ -74,6 +75,10 @@ public class MetadataResourceTest
.withNumPartitions(NUM_PARTITIONS)
.eachOfSizeInMb(500)
.toArray(new DataSegment[0]);
+
+ private final List<DataSegmentPlus> segmentsPlus = Arrays.stream(segments)
+ .map(s -> new DataSegmentPlus(s, DateTimes.nowUtc(),
DateTimes.nowUtc()))
+ .collect(Collectors.toList());
private HttpServletRequest request;
private SegmentsMetadataManager segmentsMetadataManager;
private IndexerMetadataStorageCoordinator storageCoordinator;
@@ -281,7 +286,7 @@ public class MetadataResourceTest
null,
null
);
- List<DataSegment> resultList = extractResponseList(response);
+ List<DataSegmentPlus> resultList = extractResponseList(response);
Assert.assertTrue(resultList.isEmpty());
// test valid datasource with bad limit - fails with expected invalid
limit message
@@ -302,7 +307,7 @@ public class MetadataResourceTest
response = metadataResource.getUnusedSegmentsInDataSource(request,
DATASOURCE1, null, null, null, null);
resultList = extractResponseList(response);
- Assert.assertEquals(Arrays.asList(segments), resultList);
+ Assert.assertEquals(segmentsPlus, resultList);
// test valid datasource with interval filter - returns all unused
segments for that datasource within interval
int numDays = 2;
@@ -311,7 +316,10 @@ public class MetadataResourceTest
resultList = extractResponseList(response);
Assert.assertEquals(NUM_PARTITIONS * numDays, resultList.size());
- Assert.assertEquals(Arrays.asList(segments[0], segments[1], segments[2],
segments[3]), resultList);
+ Assert.assertEquals(
+ Arrays.asList(segmentsPlus.get(0), segmentsPlus.get(1),
segmentsPlus.get(2), segmentsPlus.get(3)),
+ resultList
+ );
// test valid datasource with interval filter limit and last segment id -
returns unused segments for that
// datasource within interval upto limit starting at last segment id
@@ -320,7 +328,7 @@ public class MetadataResourceTest
resultList = extractResponseList(response);
Assert.assertEquals(limit, resultList.size());
- Assert.assertEquals(Arrays.asList(segments[0], segments[1], segments[2]),
resultList);
+ Assert.assertEquals(Arrays.asList(segmentsPlus.get(0),
segmentsPlus.get(1), segmentsPlus.get(2)), resultList);
// test valid datasource with interval filter limit and offset - returns
unused segments for that datasource within
// interval upto limit starting at offset
@@ -334,10 +342,10 @@ public class MetadataResourceTest
);
resultList = extractResponseList(response);
- Assert.assertEquals(Collections.singletonList(segments[3]), resultList);
+ Assert.assertEquals(Collections.singletonList(segmentsPlus.get(3)),
resultList);
}
- Answer<Iterable<DataSegment>> mockIterateAllUnusedSegmentsForDatasource()
+ Answer<Iterable<DataSegmentPlus>> mockIterateAllUnusedSegmentsForDatasource()
{
return invocationOnMock -> {
String dataSourceName = invocationOnMock.getArgument(0);
@@ -349,16 +357,17 @@ public class MetadataResourceTest
return ImmutableList.of();
}
- return Arrays.stream(segments)
- .filter(d -> d.getDataSource().equals(dataSourceName)
+ return segmentsPlus.stream()
+ .filter(d ->
d.getDataSegment().getDataSource().equals(dataSourceName)
&& (interval == null
- || (d.getInterval().getStartMillis() >=
interval.getStartMillis()
- && d.getInterval().getEndMillis() <=
interval.getEndMillis()))
+ ||
(d.getDataSegment().getInterval().getStartMillis() >= interval.getStartMillis()
+ &&
d.getDataSegment().getInterval().getEndMillis() <= interval.getEndMillis()))
&& (lastSegmentId == null
- || (sortOrder == null &&
d.getId().toString().compareTo(lastSegmentId) > 0)
- || (sortOrder == SortOrder.ASC &&
d.getId().toString().compareTo(lastSegmentId) > 0)
- || (sortOrder == SortOrder.DESC &&
d.getId().toString().compareTo(lastSegmentId) < 0)))
- .sorted((o1, o2) ->
Comparators.intervalsByStartThenEnd().compare(o1.getInterval(),
o2.getInterval()))
+ || (sortOrder == null &&
d.getDataSegment().getId().toString().compareTo(lastSegmentId) > 0)
+ || (sortOrder == SortOrder.ASC &&
d.getDataSegment().getId().toString().compareTo(lastSegmentId) > 0)
+ || (sortOrder == SortOrder.DESC &&
d.getDataSegment().getId().toString().compareTo(lastSegmentId) < 0)))
+ .sorted((o1, o2) -> Comparators.intervalsByStartThenEnd()
+ .compare(o1.getDataSegment().getInterval(),
o2.getDataSegment().getInterval()))
.limit(limit != null
? limit
: segments.length)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]