This is an automated email from the ASF dual-hosted git repository.
zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ab7d9bc6ecf Add api for Retrieving unused segments (#15415)
ab7d9bc6ecf is described below
commit ab7d9bc6ecf0dbb39c0eaf5b807829c019c6aca7
Author: zachjsh <[email protected]>
AuthorDate: Mon Dec 11 16:32:18 2023 -0500
Add api for Retrieving unused segments (#15415)
### Description
This pr adds an api for retrieving unused segments for a particular
datasource. The api supports pagination by the addition of `limit` and
`lastSegmentId` parameters. The resulting unused segments are returned with
optional `sortOrder`, `ASC` or `DESC` with respect to the matching segments
`id`, `start time`, and `end time`, or not returned in any guarenteed order if
`sortOrder` is not specified
`GET
/druid/coordinator/v1/datasources/{dataSourceName}/unusedSegments?interval={interval}&limit={limit}&lastSegmentId={lastSegmentId}&sortOrder={sortOrder}`
Returns a list of unused segments for a datasource in the cluster contained
within an optionally specified interval.
Optional parameters for limit and lastSegmentId can be given as well, to
limit results and enable paginated results.
The results may be sorted in either ASC, or DESC order depending on
specifying the sortOrder parameter.
`dataSourceName`: The name of the datasource
`interval`: the specific interval to search for unused
segments for.
`limit`: the maximum number of unused segments to
return information about. This property helps to
support pagination
`lastSegmentId`: the last segment id from which to search for results.
All segments returned are > this segment
lexigraphically if sortOrder is null or
ASC, or < this segment lexigraphically if sortOrder is DESC.
`sortOrder`: Specifies the order with which to return the
matching segments by start time, end time. A null
value indicates that order does not matter.
This PR has:
- [x] been self-reviewed.
- [ ] using the [concurrency
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
(Remove this item if the PR doesn't have any relation to concurrency.)
- [x] added documentation for new or modified features or behaviors.
- [ ] a release note entry in the PR description.
- [x] added Javadocs for most classes and all non-trivial methods. Linked
related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
- [x] added comments explaining the "why" and the intent of the code
wherever would not be obvious for an unfamiliar reader.
- [x] added unit tests or modified existing tests to cover new code paths,
ensuring the threshold for [code
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
is met.
- [ ] added integration tests.
- [x] been tested in a test Druid cluster.
---
docs/api-reference/legacy-metadata-api.md | 10 ++
.../IndexerSQLMetadataStorageCoordinator.java | 2 +-
.../druid/metadata/SegmentsMetadataManager.java | 24 ++++
.../java/org/apache/druid/metadata/SortOrder.java | 66 +++++++++
.../druid/metadata/SqlSegmentsMetadataManager.java | 48 ++++++-
.../druid/metadata/SqlSegmentsMetadataQuery.java | 73 ++++++++--
.../apache/druid/server/http/MetadataResource.java | 48 +++++++
.../IndexerSQLMetadataStorageCoordinatorTest.java | 147 ++++++++++++++++++---
.../org/apache/druid/metadata/SortOrderTest.java | 60 +++++++++
.../metadata/SqlSegmentsMetadataManagerTest.java | 1 -
.../simulate/TestSegmentsMetadataManager.java | 13 ++
.../druid/server/http/MetadataResourceTest.java | 142 +++++++++++++++++++-
website/.spelling | 2 +
13 files changed, 601 insertions(+), 35 deletions(-)
diff --git a/docs/api-reference/legacy-metadata-api.md
b/docs/api-reference/legacy-metadata-api.md
index 453159c1a58..134ede08d87 100644
--- a/docs/api-reference/legacy-metadata-api.md
+++ b/docs/api-reference/legacy-metadata-api.md
@@ -248,6 +248,16 @@ Returns full segment metadata for a specific segment in
the cluster.
Return the tiers that a datasource exists in.
+`GET
/druid/coordinator/v1/datasources/{dataSourceName}/unusedSegments?interval={interval}&limit={limit}&lastSegmentId={lastSegmentId}&sortOrder={sortOrder}`
+
+Returns a list of unused segments for a datasource in the cluster contained
within an optionally specified interval.
+Optional parameters for limit and lastSegmentId can be given as well, to limit
results and enable paginated results.
+The results may be sorted in either ASC, or DESC order concerning their id,
start, and end time, depending on
+specifying the sortOrder parameter. The default behavior in the absence of all
optional parameters is to return all
+unused segments for the given datasource in no guaranteed order.
+
+Example usage: `GET
/druid/coordinator/v1/datasources/inline_data/unusedSegments?interval=2023-12-01_2023-12-10&limit=10&lastSegmentId=inline_data_2023-12-03T00%3A00%3A00.000Z_2023-12-04T00%3A00%3A00.000Z_2023-12-09T14%3A16%3A53.738Z&sortOrder=ASC}`
+
## Intervals
Note that all _interval_ URL parameters are ISO 8601 strings delimited by a
`_` instead of a `/` as in `2016-06-27_2016-06-28`.
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 62f55f96c47..c62e59c0b25 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -233,7 +233,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
(handle, status) -> {
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector,
dbTables, jsonMapper)
- .retrieveUnusedSegments(dataSource,
Collections.singletonList(interval), limit)) {
+ .retrieveUnusedSegments(dataSource,
Collections.singletonList(interval), limit, null, null)) {
return ImmutableList.copyOf(iterator);
}
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
index a774afcd47b..eb8a36bc3a7 100644
---
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
@@ -125,6 +125,30 @@ public interface SegmentsMetadataManager
boolean requiresLatest
);
+ /**
+ * Returns an iterable to go over un-used segments for a given datasource
over an optional interval.
+ * The order in which segments are iterated is from earliest start-time,
with ties being broken with earliest end-time
+ * first. Note: the iteration may not be as trivially cheap as,
+ * for example, iteration over an ArrayList. Try (to some reasonable extent)
to organize the code so that it
+ * iterates the returned iterable only once rather than several times.
+ *
+ * @param datasource the name of the datasource.
+ * @param interval an optional interval to search over. If none is
specified, {@link org.apache.druid.java.util.common.Intervals#ETERNITY}
+ * @param limit an optional maximum number of results to return. If
none is specified, the results are not limited.
+ * @param lastSegmentId an optional last segment id from which to search for
results. All segments returned are >
+ * this segment lexigraphically if sortOrder is null or
{@link SortOrder#ASC}, or < this segment
+ * lexigraphically if sortOrder is {@link
SortOrder#DESC}. If none is specified, no such filter is used.
+ * @param sortOrder an optional order with which to return the matching
segments by id, start time, end time.
+ * If none is specified, the order of the results is
not guarenteed.
+ */
+ Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
+ String datasource,
+ @Nullable Interval interval,
+ @Nullable Integer limit,
+ @Nullable String lastSegmentId,
+ @Nullable SortOrder sortOrder
+ );
+
/**
* Retrieves all data source names for which there are segment in the
database, regardless of whether those segments
* are used or not. If there are no segments in the database, returns an
empty set.
diff --git a/server/src/main/java/org/apache/druid/metadata/SortOrder.java
b/server/src/main/java/org/apache/druid/metadata/SortOrder.java
new file mode 100644
index 00000000000..afabd0cde59
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/metadata/SortOrder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+/**
+ * Specifies the sort order when doing metadata store queries.
+ */
+public enum SortOrder
+{
+ ASC("ASC"),
+
+ DESC("DESC");
+
+ private String value;
+
+ SortOrder(String value)
+ {
+ this.value = value;
+ }
+
+ @Override
+ @JsonValue
+ public String toString()
+ {
+ return String.valueOf(value);
+ }
+
+ @JsonCreator
+ public static SortOrder fromValue(String value)
+ {
+ for (SortOrder b : SortOrder.values()) {
+ if (String.valueOf(b.value).equalsIgnoreCase(String.valueOf(value))) {
+ return b;
+ }
+ }
+ throw InvalidInput.exception(StringUtils.format(
+ "Unexpected value[%s] for SortOrder. Possible values are: %s",
+ value,
Arrays.stream(SortOrder.values()).map(SortOrder::toString).collect(Collectors.toList())
+ ));
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 14c6ef6c1fc..12d43ec5b76 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -25,6 +25,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
@@ -686,7 +687,7 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
}
try (final CloseableIterator<DataSegment> iterator =
- queryTool.retrieveUnusedSegments(dataSourceName, intervals,
null)) {
+ queryTool.retrieveUnusedSegments(dataSourceName, intervals,
null, null, null)) {
while (iterator.hasNext()) {
final DataSegment dataSegment = iterator.next();
timeline.addSegments(Iterators.singletonIterator(dataSegment));
@@ -955,6 +956,51 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
.transform(timeline ->
timeline.findNonOvershadowedObjectsInInterval(interval,
Partitions.ONLY_COMPLETE));
}
+ /**
+ * Retrieves segments for a given datasource that are marked unused and that
are *fully contained by* an optionally
+ * specified interval. If the interval specified is null, this method will
retrieve all unused segments.
+ *
+ * This call does not return any information about realtime segments.
+ *
+ * @param datasource The name of the datasource
+ * @param interval an optional interval to search over.
+ * @param limit an optional maximum number of results to return.
If none is specified, the results are
+ * not limited.
+ * @param lastSegmentId an optional last segment id from which to search for
results. All segments returned are >
+ * this segment lexigraphically if sortOrder is null or
{@link SortOrder#ASC}, or < this
+ * segment lexigraphically if sortOrder is {@link
SortOrder#DESC}. If none is specified, no
+ * such filter is used.
+ * @param sortOrder an optional order with which to return the matching
segments by id, start time, end time. If
+ * none is specified, the order of the results is not
guarenteed.
+
+ * Returns an iterable.
+ */
+ @Override
+ public Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
+ final String datasource,
+ @Nullable final Interval interval,
+ @Nullable final Integer limit,
+ @Nullable final String lastSegmentId,
+ @Nullable final SortOrder sortOrder
+ )
+ {
+ return connector.inReadOnlyTransaction(
+ (handle, status) -> {
+ final SqlSegmentsMetadataQuery queryTool =
+ SqlSegmentsMetadataQuery.forHandle(handle, connector,
dbTables.get(), jsonMapper);
+
+ final List<Interval> intervals =
+ interval == null
+ ? Intervals.ONLY_ETERNITY
+ : Collections.singletonList(interval);
+ try (final CloseableIterator<DataSegment> iterator =
+ queryTool.retrieveUnusedSegments(datasource, intervals,
limit, lastSegmentId, sortOrder)) {
+ return ImmutableList.copyOf(iterator);
+ }
+ }
+ );
+ }
+
@Override
public Set<String> retrieveAllDataSourceNames()
{
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 61fc919a8be..63bfdbf53de 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -117,7 +117,7 @@ public class SqlSegmentsMetadataQuery
final Collection<Interval> intervals
)
{
- return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS,
true, null);
+ return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS,
true, null, null, null);
}
/**
@@ -127,15 +127,26 @@ public class SqlSegmentsMetadataQuery
*
* This call does not return any information about realtime segments.
*
+ * @param dataSource The name of the datasource
+ * @param intervals The intervals to search over
+ * @param limit The limit of segments to return
+ * @param lastSegmentId the last segment id from which to search for
results. All segments returned are >
+ * this segment lexigraphically if sortOrder is null or
ASC, or < this segment
+ * lexigraphically if sortOrder is DESC.
+ * @param sortOrder Specifies the order with which to return the
matching segments by start time, end time.
+ * A null value indicates that order does not matter.
+
* Returns a closeable iterator. You should close it when you are done.
*/
public CloseableIterator<DataSegment> retrieveUnusedSegments(
final String dataSource,
final Collection<Interval> intervals,
- @Nullable final Integer limit
+ @Nullable final Integer limit,
+ @Nullable final String lastSegmentId,
+ @Nullable final SortOrder sortOrder
)
{
- return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS,
false, limit);
+ return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS,
false, limit, lastSegmentId, sortOrder);
}
/**
@@ -223,7 +234,15 @@ public class SqlSegmentsMetadataQuery
// Retrieve, then drop, since we can't write a WHERE clause directly.
final List<SegmentId> segments = ImmutableList.copyOf(
Iterators.transform(
- retrieveSegments(dataSource,
Collections.singletonList(interval), IntervalMode.CONTAINS, true, null),
+ retrieveSegments(
+ dataSource,
+ Collections.singletonList(interval),
+ IntervalMode.CONTAINS,
+ true,
+ null,
+ null,
+ null
+ ),
DataSegment::getId
)
);
@@ -358,12 +377,14 @@ public class SqlSegmentsMetadataQuery
final Collection<Interval> intervals,
final IntervalMode matchMode,
final boolean used,
- @Nullable final Integer limit
+ @Nullable final Integer limit,
+ @Nullable final String lastSegmentId,
+ @Nullable final SortOrder sortOrder
)
{
- if (intervals.isEmpty()) {
+ if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) {
return CloseableIterators.withEmptyBaggage(
- retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode,
used, limit)
+ retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode,
used, limit, lastSegmentId, sortOrder)
);
} else {
final List<List<Interval>> intervalsLists = Lists.partition(new
ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
@@ -371,7 +392,15 @@ public class SqlSegmentsMetadataQuery
Integer limitPerBatch = limit;
for (final List<Interval> intervalList : intervalsLists) {
- final UnmodifiableIterator<DataSegment> iterator =
retrieveSegmentsInIntervalsBatch(dataSource, intervalList, matchMode, used,
limitPerBatch);
+ final UnmodifiableIterator<DataSegment> iterator =
retrieveSegmentsInIntervalsBatch(
+ dataSource,
+ intervalList,
+ matchMode,
+ used,
+ limitPerBatch,
+ lastSegmentId,
+ sortOrder
+ );
if (limitPerBatch != null) {
// If limit is provided, we need to shrink the limit for subsequent
batches or circuit break if
// we have reached what was requested for.
@@ -394,7 +423,9 @@ public class SqlSegmentsMetadataQuery
final Collection<Interval> intervals,
final IntervalMode matchMode,
final boolean used,
- @Nullable final Integer limit
+ @Nullable final Integer limit,
+ @Nullable final String lastSegmentId,
+ @Nullable final SortOrder sortOrder
)
{
// Check if the intervals all support comparing as strings. If so, bake
them into the SQL.
@@ -407,11 +438,33 @@ public class SqlSegmentsMetadataQuery
appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode,
connector);
}
+ if (lastSegmentId != null) {
+ sb.append(
+ StringUtils.format(
+ " AND id %s :id",
+ (sortOrder == null || sortOrder == SortOrder.ASC)
+ ? ">"
+ : "<"
+ )
+ );
+ }
+
+ if (sortOrder != null) {
+ sb.append(StringUtils.format(" ORDER BY id %2$s, start %2$s, %1$send%1$s
%2$s",
+ connector.getQuoteString(),
+ sortOrder.toString()));
+ }
final Query<Map<String, Object>> sql = handle
- .createQuery(StringUtils.format(sb.toString(),
dbTables.getSegmentsTable()))
+ .createQuery(StringUtils.format(
+ sb.toString(),
+ dbTables.getSegmentsTable()
+ ))
.setFetchSize(connector.getStreamingFetchSize())
.bind("used", used)
.bind("dataSource", dataSource);
+ if (lastSegmentId != null) {
+ sql.bind("id", lastSegmentId);
+ }
if (null != limit) {
sql.setMaxRows(limit);
}
diff --git
a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
index 3fc13469723..fb976a04bf3 100644
--- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
@@ -26,9 +26,12 @@ import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.metadata.SortOrder;
import org.apache.druid.segment.metadata.AvailableSegmentMetadata;
import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache;
import org.apache.druid.segment.metadata.DataSourceInformation;
@@ -334,6 +337,51 @@ public class MetadataResource
return builder.entity(Collections2.transform(segments,
DataSegment::getId)).build();
}
+ @GET
+ @Path("/datasources/{dataSourceName}/unusedSegments")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getUnusedSegmentsInDataSource(
+ @Context final HttpServletRequest req,
+ @PathParam("dataSourceName") final String dataSource,
+ @QueryParam("interval") @Nullable String interval,
+ @QueryParam("limit") @Nullable Integer limit,
+ @QueryParam("lastSegmentId") @Nullable final String lastSegmentId,
+ @QueryParam("sortOrder") @Nullable final String sortOrder
+ )
+ {
+ if (dataSource == null || dataSource.isEmpty()) {
+ throw InvalidInput.exception("dataSourceName must be non-empty");
+ }
+ if (limit != null && limit < 0) {
+ throw InvalidInput.exception("Invalid limit[%s] specified. Limit must be
> 0", limit);
+ }
+
+ if (lastSegmentId != null && SegmentId.tryParse(dataSource, lastSegmentId)
== null) {
+ throw InvalidInput.exception("Invalid lastSegmentId[%s] specified.",
lastSegmentId);
+ }
+
+ SortOrder theSortOrder = sortOrder == null ? null :
SortOrder.fromValue(sortOrder);
+
+ final Interval theInterval = interval != null ?
Intervals.of(interval.replace('_', '/')) : null;
+ Iterable<DataSegment> unusedSegments =
segmentsMetadataManager.iterateAllUnusedSegmentsForDatasource(
+ dataSource,
+ theInterval,
+ limit,
+ lastSegmentId,
+ theSortOrder
+ );
+
+ final Function<DataSegment, Iterable<ResourceAction>> raGenerator =
segment -> Collections.singletonList(
+
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
+
+ final Iterable<DataSegment> authorizedSegments =
+ AuthorizationUtils.filterAuthorizedResources(req, unusedSegments,
raGenerator, authorizerMapper);
+
+ final List<DataSegment> retVal = new ArrayList<>();
+ authorizedSegments.iterator().forEachRemaining(retVal::add);
+ return Response.status(Response.Status.OK).entity(retVal).build();
+ }
+
@GET
@Path("/datasources/{dataSourceName}/segments/{segmentId}")
@Produces(MediaType.APPLICATION_JSON)
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 8e2e7eb747f..9e977dec3e8 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -1206,23 +1206,86 @@ public class IndexerSQLMetadataStorageCoordinatorTest
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
markAllSegmentsUnused(new HashSet<>(segments));
- final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+ final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+ null,
+ null,
+ null
+ );
+ Assert.assertEquals(segments.size(), actualUnusedSegments.size());
+ Assert.assertTrue(segments.containsAll(actualUnusedSegments));
+ }
+
+ @Test
+ public void
testRetrieveUnusedSegmentsUsingNoIntervalsNoLimitAndNoLastSegmentId() throws
IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
+ markAllSegmentsUnused(new HashSet<>(segments));
+
+ final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
+ ImmutableList.of(),
+ null,
+ null,
null
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(segments.containsAll(actualUnusedSegments));
}
+ @Test
+ public void
testRetrieveUnusedSegmentsUsingNoIntervalsAndNoLimitAndNoLastSegmentId() throws
IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(2033,
2133);
+ markAllSegmentsUnused(new HashSet<>(segments));
+
+ String lastSegmentId = segments.get(9).getId().toString();
+ final List<DataSegment> expectedSegmentsAscOrder = segments.stream()
+ .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0)
+ .collect(Collectors.toList());
+ ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegments(
+ ImmutableList.of(),
+ null,
+ lastSegmentId,
+ null
+ );
+ Assert.assertEquals(expectedSegmentsAscOrder.size(),
actualUnusedSegments.size());
+
Assert.assertTrue(expectedSegmentsAscOrder.containsAll(actualUnusedSegments));
+
+ actualUnusedSegments = retrieveUnusedSegments(
+ ImmutableList.of(),
+ null,
+ lastSegmentId,
+ SortOrder.ASC
+ );
+ Assert.assertEquals(expectedSegmentsAscOrder.size(),
actualUnusedSegments.size());
+ Assert.assertEquals(expectedSegmentsAscOrder, actualUnusedSegments);
+
+ final List<DataSegment> expectedSegmentsDescOrder = segments.stream()
+ .filter(s -> s.getId().toString().compareTo(lastSegmentId) < 0)
+ .collect(Collectors.toList());
+ Collections.reverse(expectedSegmentsDescOrder);
+
+ actualUnusedSegments = retrieveUnusedSegments(
+ ImmutableList.of(),
+ null,
+ lastSegmentId,
+ SortOrder.DESC
+ );
+ Assert.assertEquals(expectedSegmentsDescOrder.size(),
actualUnusedSegments.size());
+ Assert.assertEquals(expectedSegmentsDescOrder, actualUnusedSegments);
+ }
+
@Test
public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
markAllSegmentsUnused(new HashSet<>(segments));
- final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+ final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
- segments.size()
+ segments.size(),
+ null,
+ null
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(segments.containsAll(actualUnusedSegments));
@@ -1235,23 +1298,69 @@ public class IndexerSQLMetadataStorageCoordinatorTest
markAllSegmentsUnused(new HashSet<>(segments));
final int requestedLimit = segments.size() - 1;
- final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
-
segments.stream().limit(requestedLimit).map(DataSegment::getInterval).collect(Collectors.toList()),
- requestedLimit
+ final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
+
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+ requestedLimit,
+ null,
+ null
);
Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList())));
}
+ @Test
+ public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsInSingleBatchLimitAndOffsetInRange()
throws IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(2034,
2133);
+ markAllSegmentsUnused(new HashSet<>(segments));
+
+ final int requestedLimit = segments.size();
+ final String lastSegmentId = segments.get(4).getId().toString();
+ final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
+
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+ requestedLimit,
+ lastSegmentId,
+ null
+ );
+ Assert.assertEquals(segments.size() - 5, actualUnusedSegments.size());
+ Assert.assertEquals(actualUnusedSegments, segments.stream()
+ .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0)
+ .limit(requestedLimit)
+ .collect(Collectors.toList()));
+ }
+
+ @Test
+ public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffsetInRange() throws
IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
+ markAllSegmentsUnused(new HashSet<>(segments));
+
+ final int requestedLimit = segments.size() - 1;
+ final String lastSegmentId = segments.get(4).getId().toString();
+ final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
+
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+ requestedLimit,
+ lastSegmentId,
+ null
+ );
+ Assert.assertEquals(requestedLimit - 4, actualUnusedSegments.size());
+ Assert.assertEquals(actualUnusedSegments, segments.stream()
+ .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0)
+ .limit(requestedLimit)
+ .collect(Collectors.toList()));
+ }
+
@Test
public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitOutOfRange() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
markAllSegmentsUnused(new HashSet<>(segments));
- final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+ final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
- segments.size() + 1
+ segments.size() + 1,
+ null,
+ null
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments));
@@ -1267,8 +1376,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertTrue(segments.stream()
.anyMatch(segment ->
!segment.getInterval().overlaps(outOfRangeInterval)));
- final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+ final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
ImmutableList.of(outOfRangeInterval),
+ null,
+ null,
null
);
Assert.assertEquals(0, actualUnusedSegments.size());
@@ -3048,21 +3159,23 @@ public class IndexerSQLMetadataStorageCoordinatorTest
return segments;
}
- private ImmutableList<DataSegment>
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+ private ImmutableList<DataSegment> retrieveUnusedSegments(
final List<Interval> intervals,
- final Integer limit
+ final Integer limit,
+ final String lastSegmentId,
+ final SortOrder sortOrder
)
{
return derbyConnector.inReadOnlyTransaction(
(handle, status) -> {
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(
- handle,
- derbyConnector,
-
derbyConnectorRule.metadataTablesConfigSupplier().get(),
- mapper
- )
- .retrieveUnusedSegments(DS.WIKI,
intervals, limit)) {
+ handle,
+ derbyConnector,
+
derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ mapper
+ )
+ .retrieveUnusedSegments(DS.WIKI, intervals, limit,
lastSegmentId, sortOrder)) {
return ImmutableList.copyOf(iterator);
}
}
diff --git a/server/src/test/java/org/apache/druid/metadata/SortOrderTest.java
b/server/src/test/java/org/apache/druid/metadata/SortOrderTest.java
new file mode 100644
index 00000000000..a191763d444
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/metadata/SortOrderTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SortOrderTest
+{
+
+ @Test
+ public void testAsc()
+ {
+ Assert.assertEquals(SortOrder.ASC, SortOrder.fromValue("asc"));
+ Assert.assertEquals("ASC", SortOrder.fromValue("asc").toString());
+ Assert.assertEquals(SortOrder.ASC, SortOrder.fromValue("ASC"));
+ Assert.assertEquals("ASC", SortOrder.fromValue("ASC").toString());
+ Assert.assertEquals(SortOrder.ASC, SortOrder.fromValue("AsC"));
+ Assert.assertEquals("ASC", SortOrder.fromValue("AsC").toString());
+ }
+
+ @Test
+ public void testDesc()
+ {
+ Assert.assertEquals(SortOrder.DESC, SortOrder.fromValue("desc"));
+ Assert.assertEquals("DESC", SortOrder.fromValue("desc").toString());
+ Assert.assertEquals(SortOrder.DESC, SortOrder.fromValue("DESC"));
+ Assert.assertEquals("DESC", SortOrder.fromValue("DESC").toString());
+ Assert.assertEquals(SortOrder.DESC, SortOrder.fromValue("DesC"));
+ Assert.assertEquals("DESC", SortOrder.fromValue("DesC").toString());
+ }
+
+ @Test
+ public void testInvalid()
+ {
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "Unexpected value[bad] for SortOrder. Possible values are: [ASC, DESC]"
+ ).assertThrowsAndMatches(
+ () -> SortOrder.fromValue("bad")
+ );
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
index 7a23234761e..c1a5dbbdac0 100644
---
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
@@ -56,7 +56,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-
public class SqlSegmentsMetadataManagerTest
{
private static DataSegment createSegment(
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
index 76493dfdfce..cfb8fec941e 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.metadata.SortOrder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentId;
@@ -192,6 +193,18 @@ public class TestSegmentsMetadataManager implements
SegmentsMetadataManager
));
}
+ @Override
+ public Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
+ String datasource,
+ @Nullable Interval interval,
+ @Nullable Integer limit,
+ @Nullable String lastSegmentId,
+ @Nullable SortOrder sortOrder
+ )
+ {
+ return null;
+ }
+
@Override
public Set<String> retrieveAllDataSourceNames()
{
diff --git
a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
index cb6e9d2a37e..b1c8f0cb6de 100644
---
a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
@@ -26,9 +26,13 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.metadata.SortOrder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.metadata.AvailableSegmentMetadata;
@@ -42,28 +46,36 @@ import
org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentStatusInCluster;
+import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
+
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class MetadataResourceTest
{
private static final String DATASOURCE1 = "datasource1";
-
+ private static final String SEGMENT_START_INTERVAL = "2012-10-24";
+ private static final int NUM_PARTITIONS = 2;
private final DataSegment[] segments =
CreateDataSegments.ofDatasource(DATASOURCE1)
- .forIntervals(3, Granularities.DAY)
- .withNumPartitions(2)
- .eachOfSizeInMb(500)
- .toArray(new DataSegment[0]);
+ .startingAt(SEGMENT_START_INTERVAL)
+ .forIntervals(3, Granularities.DAY)
+ .withNumPartitions(NUM_PARTITIONS)
+ .eachOfSizeInMb(500)
+ .toArray(new DataSegment[0]);
private HttpServletRequest request;
private SegmentsMetadataManager segmentsMetadataManager;
private IndexerMetadataStorageCoordinator storageCoordinator;
@@ -236,6 +248,126 @@ public class MetadataResourceTest
Assert.assertEquals(new SegmentStatusInCluster(realTimeSegments[1], false,
null, 40L, true), resultList.get(5));
}
+ @Test
+ public void testGetUnusedSegmentsInDataSource()
+ {
+ Mockito.doAnswer(mockIterateAllUnusedSegmentsForDatasource())
+ .when(segmentsMetadataManager)
+ .iterateAllUnusedSegmentsForDatasource(
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any());
+
+ // test with null datasource name - fails with expected bad datasource
name error
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "dataSourceName must be non-empty"
+ ).assertThrowsAndMatches(
+ () -> metadataResource.getUnusedSegmentsInDataSource(request, null,
null, null, null, null)
+ );
+
+ // test with empty datasource name - fails with expected bad datasource
name error
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "dataSourceName must be non-empty"
+ ).assertThrowsAndMatches(
+ () -> metadataResource.getUnusedSegmentsInDataSource(request, "",
null, null, null, null)
+ );
+
+ // test invalid datasource - returns empty segments
+ Response response = metadataResource.getUnusedSegmentsInDataSource(
+ request,
+ "invalid_datasource",
+ null,
+ null,
+ null,
+ null
+ );
+ List<DataSegment> resultList = extractResponseList(response);
+ Assert.assertTrue(resultList.isEmpty());
+
+ // test valid datasource with bad limit - fails with expected invalid
limit message
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ StringUtils.format("Invalid limit[%s] specified. Limit must be > 0",
-1)
+ ).assertThrowsAndMatches(
+ () -> metadataResource.getUnusedSegmentsInDataSource(request,
DATASOURCE1, null, -1, null, null)
+ );
+
+ // test valid datasource with invalid lastSegmentId - fails with expected
invalid lastSegmentId message
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ StringUtils.format("Invalid lastSegmentId[%s] specified.", "invalid")
+ ).assertThrowsAndMatches(
+ () -> metadataResource.getUnusedSegmentsInDataSource(request,
DATASOURCE1, null, null, "invalid", null)
+ );
+
+ // test valid datasource - returns all unused segments for that datasource
+ response = metadataResource.getUnusedSegmentsInDataSource(request,
DATASOURCE1, null, null, null, null);
+
+ resultList = extractResponseList(response);
+ Assert.assertEquals(Arrays.asList(segments), resultList);
+
+ // test valid datasource with interval filter - returns all unused
segments for that datasource within interval
+ int numDays = 2;
+ String interval = SEGMENT_START_INTERVAL + "_P" + numDays + "D";
+ response = metadataResource.getUnusedSegmentsInDataSource(request,
DATASOURCE1, interval, null, null, null);
+
+ resultList = extractResponseList(response);
+ Assert.assertEquals(NUM_PARTITIONS * numDays, resultList.size());
+ Assert.assertEquals(Arrays.asList(segments[0], segments[1], segments[2],
segments[3]), resultList);
+
+ // test valid datasource with interval filter limit and last segment id -
returns unused segments for that
+ // datasource within interval upto limit starting at last segment id
+ int limit = 3;
+ response = metadataResource.getUnusedSegmentsInDataSource(request,
DATASOURCE1, interval, limit, null, null);
+
+ resultList = extractResponseList(response);
+ Assert.assertEquals(limit, resultList.size());
+ Assert.assertEquals(Arrays.asList(segments[0], segments[1], segments[2]),
resultList);
+
+ // test valid datasource with interval filter limit and offset - returns
unused segments for that datasource within
+ // interval upto limit starting at offset
+ response = metadataResource.getUnusedSegmentsInDataSource(
+ request,
+ DATASOURCE1,
+ interval,
+ limit,
+ segments[2].getId().toString(),
+ null
+ );
+
+ resultList = extractResponseList(response);
+ Assert.assertEquals(Collections.singletonList(segments[3]), resultList);
+ }
+
+ Answer<Iterable<DataSegment>> mockIterateAllUnusedSegmentsForDatasource()
+ {
+ return invocationOnMock -> {
+ String dataSourceName = invocationOnMock.getArgument(0);
+ Interval interval = invocationOnMock.getArgument(1);
+ Integer limit = invocationOnMock.getArgument(2);
+ String lastSegmentId = invocationOnMock.getArgument(3);
+ SortOrder sortOrder = invocationOnMock.getArgument(4);
+ if (!DATASOURCE1.equals(dataSourceName)) {
+ return ImmutableList.of();
+ }
+
+ return Arrays.stream(segments)
+ .filter(d -> d.getDataSource().equals(dataSourceName)
+ && (interval == null
+ || (d.getInterval().getStartMillis() >=
interval.getStartMillis()
+ && d.getInterval().getEndMillis() <=
interval.getEndMillis()))
+ && (lastSegmentId == null
+ || (sortOrder == null &&
d.getId().toString().compareTo(lastSegmentId) > 0)
+ || (sortOrder == SortOrder.ASC &&
d.getId().toString().compareTo(lastSegmentId) > 0)
+ || (sortOrder == SortOrder.DESC &&
d.getId().toString().compareTo(lastSegmentId) < 0)))
+ .sorted((o1, o2) ->
Comparators.intervalsByStartThenEnd().compare(o1.getInterval(),
o2.getInterval()))
+ .limit(limit != null
+ ? limit
+ : segments.length)
+ .collect(Collectors.toList());
+ };
+ }
+
@Test
public void testGetDataSourceInformation()
{
diff --git a/website/.spelling b/website/.spelling
index 5b21a4c5bd2..cad92260fbd 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -373,6 +373,7 @@ kubernetes
kubexit
k8s
laning
+lastSegmentId
lifecycle
localhost
log4j
@@ -501,6 +502,7 @@ Smoosh
smoosh
smooshed
snapshotting
+sortOrder
splittable
ssl
sslmode
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]