This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new fa8e5114921 Add versions to `markUsed` and `markUnused` APIs (#16141)
fa8e5114921 is described below
commit fa8e51149216a367e9b303961c784a99e69de138
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Tue Mar 19 09:22:25 2024 -0700
Add versions to `markUsed` and `markUnused` APIs (#16141)
* Mark used and unused APIs by versions.
* remove the conditional invocations.
* isValid() and test updates.
* isValid() and tests.
* Remove warning logs for invalid user requests. Also, downgrade visibility.
* Update resp message, etc.
* tests and some cleanup.
* Docs draft
* Clarify docs
* Update
server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
Co-authored-by: Kashif Faraz <[email protected]>
* Review comments
* Remove default interface methods only used in tests and update docs.
* Clarify javadocs and @Nullable.
* Add more tests.
* Parameterized versions.
---------
Co-authored-by: Kashif Faraz <[email protected]>
---
docs/api-reference/data-management-api.md | 28 +-
.../common/actions/MarkSegmentsAsUnusedAction.java | 5 +-
.../common/task/KillUnusedSegmentsTaskTest.java | 21 +-
.../druid/metadata/SegmentsMetadataManager.java | 16 +-
.../druid/metadata/SqlSegmentsMetadataManager.java | 35 ++-
.../druid/metadata/SqlSegmentsMetadataQuery.java | 146 ++++++++---
.../druid/server/http/DataSourcesResource.java | 60 +++--
.../metadata/SqlSegmentsMetadataManagerTest.java | 257 +++++++++++++++++-
.../apache/druid/metadata/TestDerbyConnector.java | 2 +-
.../simulate/TestSegmentsMetadataManager.java | 4 +-
.../druid/server/http/DataSourcesResourceTest.java | 286 +++++++++++++++++----
11 files changed, 708 insertions(+), 152 deletions(-)
diff --git a/docs/api-reference/data-management-api.md
b/docs/api-reference/data-management-api.md
index 4adeaa8b208..754bf62f725 100644
--- a/docs/api-reference/data-management-api.md
+++ b/docs/api-reference/data-management-api.md
@@ -206,7 +206,8 @@ Marks the state of a group of segments as unused, using an
array of segment IDs
Pass the array of segment IDs or interval as a JSON object in the request body.
For the interval, specify the start and end times as ISO 8601 strings to
identify segments inclusive of the start time and exclusive of the end time.
-Druid only updates the segments completely contained within the specified
interval; partially overlapping segments are not affected.
+Optionally, specify an array of segment versions with interval. Druid updates
only the segments completely contained
+within the specified interval that match the optional list of versions;
partially overlapping segments are not affected.
#### URL
@@ -214,12 +215,13 @@ Druid only updates the segments completely contained
within the specified interv
#### Request body
-The group of segments is sent as a JSON request payload that accepts one of
the following properties:
+The group of segments is sent as a JSON request payload that accepts the
following properties:
-|Property|Description|Example|
-|----------|-------------|---------|
-|`interval`|ISO 8601 segments
interval.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`|
-|`segmentIds`|Array of segment IDs.|`["segmentId1", "segmentId2"]`|
+|Property|Description|Required|Example|
+|----------|-------------|---------|---------|
+|`interval`|ISO 8601 segments interval.|Yes, if `segmentIds` is not
specified.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`|
+|`segmentIds`|List of segment IDs.|Yes, if `interval` is not
specified.|`["segmentId1", "segmentId2"]`|
+|`versions`|List of segment versions. Must be provided with
`interval`.|No.|`["2024-03-14T16:00:04.086Z", ""2024-03-12T16:00:04.086Z"]`|
#### Responses
@@ -306,7 +308,8 @@ Marks the state of a group of segments as used, using an
array of segment IDs or
Pass the array of segment IDs or interval as a JSON object in the request body.
For the interval, specify the start and end times as ISO 8601 strings to
identify segments inclusive of the start time and exclusive of the end time.
-Druid only updates the segments completely contained within the specified
interval; partially overlapping segments are not affected.
+Optionally, specify an array of segment versions with interval. Druid updates
only the segments completely contained
+within the specified interval that match the optional list of versions;
partially overlapping segments are not affected.
#### URL
@@ -314,12 +317,13 @@ Druid only updates the segments completely contained
within the specified interv
#### Request body
-The group of segments is sent as a JSON request payload that accepts one of
the following properties:
+The group of segments is sent as a JSON request payload that accepts the
following properties:
-|Property|Description|Example|
-|----------|-------------|---------|
-|`interval`| ISO 8601 segments
interval.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`|
-|`segmentIds`|Array of segment IDs.|`["segmentId1", "segmentId2"]`|
+|Property|Description|Required|Example|
+|----------|-------------|---------|---------|
+|`interval`|ISO 8601 segments interval.|Yes, if `segmentIds` is not
specified.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`|
+|`segmentIds`|List of segment IDs.|Yes, if `interval` is not
specified.|`["segmentId1", "segmentId2"]`|
+|`versions`|List of segment versions. Must be provided with
`interval`.|No.|`["2024-03-14T16:00:04.086Z", ""2024-03-12T16:00:04.086Z"]`|
#### Responses
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java
index ddf57afbc18..93cb75280fa 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java
@@ -63,9 +63,8 @@ public class MarkSegmentsAsUnusedAction implements
TaskAction<Integer>
@Override
public Integer perform(Task task, TaskActionToolbox toolbox)
{
- int numMarked = toolbox.getIndexerMetadataStorageCoordinator()
- .markSegmentsAsUnusedWithinInterval(dataSource,
interval);
- return numMarked;
+ return toolbox.getIndexerMetadataStorageCoordinator()
+ .markSegmentsAsUnusedWithinInterval(dataSource, interval);
}
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index 1e36cc825a0..382673bed4b 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -387,7 +387,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
segments.size(),
getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
DATA_SOURCE,
- Intervals.of("2018-01-01/2020-01-01")
+ Intervals.of("2018-01-01/2020-01-01"),
+ null
)
);
@@ -434,7 +435,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
1,
getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
DATA_SOURCE,
- segment1.getInterval()
+ segment1.getInterval(),
+ null
)
);
@@ -442,7 +444,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
1,
getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
DATA_SOURCE,
- segment4.getInterval()
+ segment4.getInterval(),
+ null
)
);
@@ -450,7 +453,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
1,
getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
DATA_SOURCE,
- segment3.getInterval()
+ segment3.getInterval(),
+ null
)
);
@@ -508,7 +512,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
1,
getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
DATA_SOURCE,
- segment1.getInterval()
+ segment1.getInterval(),
+ null
)
);
@@ -516,7 +521,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
1,
getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
DATA_SOURCE,
- segment4.getInterval()
+ segment4.getInterval(),
+ null
)
);
@@ -529,7 +535,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
1,
getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
DATA_SOURCE,
- segment3.getInterval()
+ segment3.getInterval(),
+ null
)
);
diff --git
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
index 540eba990f2..f0b9f06425d 100644
---
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
@@ -53,7 +53,13 @@ public interface SegmentsMetadataManager
*/
int markAsUsedAllNonOvershadowedSegmentsInDataSource(String dataSource);
- int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval
interval);
+ /**
+ * Marks non-overshadowed unused segments for the given interval and
optional list of versions
+ * as used. If versions are not specified, all versions of non-overshadowed
unused segments in the interval
+ * will be marked as used.
+ * @return Number of segments updated
+ */
+ int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval
interval, @Nullable List<String> versions);
/**
* Marks the given segment IDs as "used" only if there are not already
overshadowed
@@ -81,7 +87,13 @@ public interface SegmentsMetadataManager
*/
int markAsUnusedAllSegmentsInDataSource(String dataSource);
- int markAsUnusedSegmentsInInterval(String dataSource, Interval interval);
+ /**
+ * Marks segments as unused that are <b>fully contained</b> in the given
interval for an optional list of versions.
+ * If versions are not specified, all versions of segments in the interval
will be marked as unused.
+ * Segments that are already marked as unused are not updated.
+ * @return The number of segments updated
+ */
+ int markAsUnusedSegmentsInInterval(String dataSource, Interval interval,
@Nullable List<String> versions);
int markSegmentsAsUnused(Set<SegmentId> segmentIds);
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 4a268a2257d..66a60e072c0 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -654,21 +654,25 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
@Override
public int markAsUsedAllNonOvershadowedSegmentsInDataSource(final String
dataSource)
{
- return doMarkAsUsedNonOvershadowedSegments(dataSource, null);
+ return doMarkAsUsedNonOvershadowedSegments(dataSource, null, null);
}
@Override
- public int markAsUsedNonOvershadowedSegmentsInInterval(final String
dataSource, final Interval interval)
+ public int markAsUsedNonOvershadowedSegmentsInInterval(
+ final String dataSource,
+ final Interval interval,
+ @Nullable final List<String> versions
+ )
{
Preconditions.checkNotNull(interval);
- return doMarkAsUsedNonOvershadowedSegments(dataSource, interval);
+ return doMarkAsUsedNonOvershadowedSegments(dataSource, interval, versions);
}
- /**
- * Implementation for both {@link
#markAsUsedAllNonOvershadowedSegmentsInDataSource} (if the given interval is
null)
- * and {@link #markAsUsedNonOvershadowedSegmentsInInterval}.
- */
- private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName,
@Nullable Interval interval)
+ private int doMarkAsUsedNonOvershadowedSegments(
+ final String dataSourceName,
+ final @Nullable Interval interval,
+ final @Nullable List<String> versions
+ )
{
final List<DataSegment> unusedSegments = new ArrayList<>();
final SegmentTimeline timeline = new SegmentTimeline();
@@ -682,12 +686,12 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
interval == null ? Intervals.ONLY_ETERNITY :
Collections.singletonList(interval);
try (final CloseableIterator<DataSegment> iterator =
- queryTool.retrieveUsedSegments(dataSourceName, intervals)) {
+ queryTool.retrieveUsedSegments(dataSourceName, intervals,
versions)) {
timeline.addSegments(iterator);
}
try (final CloseableIterator<DataSegment> iterator =
- queryTool.retrieveUnusedSegments(dataSourceName, intervals,
null, null, null, null, null)) {
+ queryTool.retrieveUnusedSegments(dataSourceName, intervals,
versions, null, null, null, null)) {
while (iterator.hasNext()) {
final DataSegment dataSegment = iterator.next();
timeline.addSegments(Iterators.singletonIterator(dataSegment));
@@ -796,7 +800,7 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
private int markSegmentsAsUsed(final List<SegmentId> segmentIds)
{
if (segmentIds.isEmpty()) {
- log.info("No segments found to update!");
+ log.info("No segments found to mark as used.");
return 0;
}
@@ -856,13 +860,18 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
}
@Override
- public int markAsUnusedSegmentsInInterval(String dataSourceName, Interval
interval)
+ public int markAsUnusedSegmentsInInterval(
+ final String dataSource,
+ final Interval interval,
+ @Nullable final List<String> versions
+ )
{
+ Preconditions.checkNotNull(interval);
try {
return connector.getDBI().withHandle(
handle ->
SqlSegmentsMetadataQuery.forHandle(handle, connector,
dbTables.get(), jsonMapper)
- .markSegmentsUnused(dataSourceName,
interval)
+ .markSegmentsUnused(dataSource,
interval, versions)
);
}
catch (Exception e) {
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 29465cd665b..5308f9dd7fe 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -42,6 +42,8 @@ import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.Update;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -119,11 +121,24 @@ public class SqlSegmentsMetadataQuery
final String dataSource,
final Collection<Interval> intervals
)
+ {
+ return retrieveUsedSegments(dataSource, intervals, null);
+ }
+
+ /**
+ * Similar to {@link #retrieveUsedSegments}, but with an additional {@code
versions} argument. When {@code versions}
+ * is specified, all used segments in the specified {@code intervals} and
{@code versions} are retrieved.
+ */
+ public CloseableIterator<DataSegment> retrieveUsedSegments(
+ final String dataSource,
+ final Collection<Interval> intervals,
+ final List<String> versions
+ )
{
return retrieveSegments(
dataSource,
intervals,
- null,
+ versions,
IntervalMode.OVERLAPS,
true,
null,
@@ -134,7 +149,7 @@ public class SqlSegmentsMetadataQuery
}
/**
- * Retrieves segments for a given datasource that are marked unused and that
are *fully contained by* any interval
+ * Retrieves segments for a given datasource that are marked unused and that
are <b>fully contained by</b> any interval
* in a particular collection of intervals. If the collection of intervals
is empty, this method will retrieve all
* unused segments.
* <p>
@@ -184,7 +199,7 @@ public class SqlSegmentsMetadataQuery
/**
* Similar to {@link #retrieveUnusedSegments}, but also retrieves associated
metadata for the segments for a given
- * datasource that are marked unused and that are *fully contained by* any
interval in a particular collection of
+ * datasource that are marked unused and that are <b>fully contained by</b>
any interval in a particular collection of
* intervals. If the collection of intervals is empty, this method will
retrieve all unused segments.
*
* This call does not return any information about realtime segments.
@@ -312,45 +327,83 @@ public class SqlSegmentsMetadataQuery
}
/**
- * Marks all used segments that are *fully contained by* a particular
interval as unused.
+ * Marks all used segments that are <b>fully contained by</b> a particular
interval as unused.
*
- * @return the number of segments actually modified.
+ * @return Number of segments updated.
*/
public int markSegmentsUnused(final String dataSource, final Interval
interval)
+ {
+ return markSegmentsUnused(dataSource, interval, null);
+ }
+
+ /**
+ * Marks all used segments that are <b>fully contained by</b> a particular
interval filtered by an optional list of versions
+ * as unused.
+ *
+ * @return Number of segments updated.
+ */
+ public int markSegmentsUnused(final String dataSource, final Interval
interval, @Nullable final List<String> versions)
{
if (Intervals.isEternity(interval)) {
- return handle
- .createStatement(
- StringUtils.format(
- "UPDATE %s SET used=:used, used_status_last_updated =
:used_status_last_updated "
- + "WHERE dataSource = :dataSource AND used = true",
- dbTables.getSegmentsTable()
- )
+ final StringBuilder sb = new StringBuilder();
+ sb.append(
+ StringUtils.format(
+ "UPDATE %s SET used=:used, used_status_last_updated =
:used_status_last_updated "
+ + "WHERE dataSource = :dataSource AND used = true",
+ dbTables.getSegmentsTable()
)
+ );
+
+ final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);
+
+ if (hasVersions) {
+ sb.append(getConditionForVersions(versions));
+ }
+
+ final Update stmt = handle
+ .createStatement(sb.toString())
.bind("dataSource", dataSource)
.bind("used", false)
- .bind("used_status_last_updated", DateTimes.nowUtc().toString())
- .execute();
+ .bind("used_status_last_updated", DateTimes.nowUtc().toString());
+
+ if (hasVersions) {
+ bindVersionsToQuery(stmt, versions);
+ }
+
+ return stmt.execute();
} else if (Intervals.canCompareEndpointsAsStrings(interval)
&& interval.getStart().getYear() ==
interval.getEnd().getYear()) {
// Safe to write a WHERE clause with this interval. Note that it is
unsafe if the years are different, because
// that means extra characters can sneak in. (Consider a query interval
like "2000-01-01/2001-01-01" and a
// segment interval like "20001/20002".)
- return handle
- .createStatement(
- StringUtils.format(
- "UPDATE %s SET used=:used, used_status_last_updated =
:used_status_last_updated "
- + "WHERE dataSource = :dataSource AND used = true AND %s",
- dbTables.getSegmentsTable(),
-
IntervalMode.CONTAINS.makeSqlCondition(connector.getQuoteString(), ":start",
":end")
- )
+ final StringBuilder sb = new StringBuilder();
+ sb.append(
+ StringUtils.format(
+ "UPDATE %s SET used=:used, used_status_last_updated =
:used_status_last_updated "
+ + "WHERE dataSource = :dataSource AND used = true AND %s",
+ dbTables.getSegmentsTable(),
+
IntervalMode.CONTAINS.makeSqlCondition(connector.getQuoteString(), ":start",
":end")
)
+ );
+
+ final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);
+
+ if (hasVersions) {
+ sb.append(getConditionForVersions(versions));
+ }
+
+ final Update stmt = handle
+ .createStatement(sb.toString())
.bind("dataSource", dataSource)
.bind("used", false)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
- .bind("used_status_last_updated", DateTimes.nowUtc().toString())
- .execute();
+ .bind("used_status_last_updated", DateTimes.nowUtc().toString());
+
+ if (hasVersions) {
+ bindVersionsToQuery(stmt, versions);
+ }
+ return stmt.execute();
} else {
// Retrieve, then drop, since we can't write a WHERE clause directly.
final List<SegmentId> segments = ImmutableList.copyOf(
@@ -358,7 +411,7 @@ public class SqlSegmentsMetadataQuery
retrieveSegments(
dataSource,
Collections.singletonList(interval),
- null,
+ versions,
IntervalMode.CONTAINS,
true,
null,
@@ -680,11 +733,10 @@ public class SqlSegmentsMetadataQuery
appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode,
connector);
}
- if (!CollectionUtils.isNullOrEmpty(versions)) {
- final String versionsStr = versions.stream()
- .map(version -> "'" + version + "'")
- .collect(Collectors.joining(","));
- sb.append(StringUtils.format(" AND version IN (%s)", versionsStr));
+ final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);
+
+ if (hasVersions) {
+ sb.append(getConditionForVersions(versions));
}
// Add the used_status_last_updated time filter only for unused segments
when maxUsedStatusLastUpdatedTime is non-null.
@@ -734,6 +786,10 @@ public class SqlSegmentsMetadataQuery
bindQueryIntervals(sql, intervals);
}
+ if (hasVersions) {
+ bindVersionsToQuery(sql, versions);
+ }
+
return sql;
}
@@ -834,6 +890,36 @@ public class SqlSegmentsMetadataQuery
return numChangedSegments;
}
+ private static String getConditionForVersions(final List<String> versions)
+ {
+ if (CollectionUtils.isNullOrEmpty(versions)) {
+ return "";
+ }
+
+ final StringBuilder sb = new StringBuilder();
+
+ sb.append(" AND version IN (");
+ for (int i = 0; i < versions.size(); i++) {
+ sb.append(StringUtils.format(":version%d", i));
+ if (i != versions.size() - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+
+ private static void bindVersionsToQuery(final SQLStatement query, final
List<String> versions)
+ {
+ if (CollectionUtils.isNullOrEmpty(versions)) {
+ return;
+ }
+
+ for (int i = 0; i < versions.size(); i++) {
+ query.bind(StringUtils.format("version%d", i), versions.get(i));
+ }
+ }
+
enum IntervalMode
{
CONTAINS {
diff --git
a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index a633640f0e2..2f4334b36ac 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -206,22 +206,21 @@ public class DataSourcesResource
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(DatasourceResourceFilter.class)
public Response markAsUsedNonOvershadowedSegments(
- @PathParam("dataSourceName") String dataSourceName,
- MarkDataSourceSegmentsPayload payload
+ @PathParam("dataSourceName") final String dataSourceName,
+ final SegmentsToUpdateFilter payload
)
{
if (payload == null || !payload.isValid()) {
- log.warn("Invalid request payload: [%s]", payload);
return Response
.status(Response.Status.BAD_REQUEST)
- .entity("Invalid request payload, either interval or segmentIds
array must be specified")
+ .entity(SegmentsToUpdateFilter.INVALID_PAYLOAD_ERROR_MESSAGE)
.build();
} else {
SegmentUpdateOperation operation = () -> {
-
final Interval interval = payload.getInterval();
+ final List<String> versions = payload.getVersions();
if (interval != null) {
- return
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName,
interval);
+ return
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName,
interval, versions);
} else {
final Set<String> segmentIds = payload.getSegmentIds();
if (segmentIds == null || segmentIds.isEmpty()) {
@@ -254,22 +253,22 @@ public class DataSourcesResource
@Consumes(MediaType.APPLICATION_JSON)
public Response markSegmentsAsUnused(
@PathParam("dataSourceName") final String dataSourceName,
- final MarkDataSourceSegmentsPayload payload,
+ final SegmentsToUpdateFilter payload,
@Context final HttpServletRequest req
)
{
if (payload == null || !payload.isValid()) {
- log.warn("Invalid request payload: [%s]", payload);
return Response
.status(Response.Status.BAD_REQUEST)
- .entity("Invalid request payload, either interval or segmentIds
array must be specified")
+ .entity(SegmentsToUpdateFilter.INVALID_PAYLOAD_ERROR_MESSAGE)
.build();
} else {
SegmentUpdateOperation operation = () -> {
final Interval interval = payload.getInterval();
+ final List<String> versions = payload.getVersions();
final int numUpdatedSegments;
if (interval != null) {
- numUpdatedSegments =
segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName,
interval);
+ numUpdatedSegments =
segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName,
interval, versions);
} else {
final Set<SegmentId> segmentIds =
payload.getSegmentIds()
@@ -302,7 +301,7 @@ public class DataSourcesResource
private static Response logAndCreateDataSourceNotFoundResponse(String
dataSourceName)
{
- log.warn("datasource not found [%s]", dataSourceName);
+ log.warn("datasource[%s] not found", dataSourceName);
return Response.noContent().build();
}
@@ -319,7 +318,7 @@ public class DataSourcesResource
.build();
}
catch (Exception e) {
- log.error(e, "Error occurred while updating segments for data
source[%s]", dataSourceName);
+ log.error(e, "Error occurred while updating segments for
datasource[%s]", dataSourceName);
return Response
.serverError()
.entity(ImmutableMap.of("error", "Exception occurred.", "message",
Throwables.getRootCause(e).toString()))
@@ -567,9 +566,9 @@ public class DataSourcesResource
private static class SegmentsLoadStatistics
{
- private int numPublishedSegments;
- private int numUnavailableSegments;
- private int numLoadedSegments;
+ private final int numPublishedSegments;
+ private final int numUnavailableSegments;
+ private final int numLoadedSegments;
SegmentsLoadStatistics(
int numPublishedSegments,
@@ -991,39 +990,58 @@ public class DataSourcesResource
return false;
}
+ /**
+ * Either {@code interval} or {@code segmentIds} array must be specified,
but not both.
+ * {@code versions} may be optionally specified only when {@code interval}
is provided.
+ */
@VisibleForTesting
- protected static class MarkDataSourceSegmentsPayload
+ static class SegmentsToUpdateFilter
{
private final Interval interval;
private final Set<String> segmentIds;
+ private final List<String> versions;
+
+ private static final String INVALID_PAYLOAD_ERROR_MESSAGE = "Invalid
request payload. Specify either 'interval' or 'segmentIds', but not both."
+ + "
Optionally, include 'versions' only when 'interval' is provided.";
@JsonCreator
- public MarkDataSourceSegmentsPayload(
- @JsonProperty("interval") Interval interval,
- @JsonProperty("segmentIds") Set<String> segmentIds
+ public SegmentsToUpdateFilter(
+ @JsonProperty("interval") @Nullable Interval interval,
+ @JsonProperty("segmentIds") @Nullable Set<String> segmentIds,
+ @JsonProperty("versions") @Nullable List<String> versions
)
{
this.interval = interval;
this.segmentIds = segmentIds;
+ this.versions = versions;
}
+ @Nullable
@JsonProperty
public Interval getInterval()
{
return interval;
}
+ @Nullable
@JsonProperty
public Set<String> getSegmentIds()
{
return segmentIds;
}
- public boolean isValid()
+ @Nullable
+ @JsonProperty
+ public List<String> getVersions()
+ {
+ return versions;
+ }
+
+ private boolean isValid()
{
final boolean hasSegmentIds = !CollectionUtils.isNullOrEmpty(segmentIds);
if (interval == null) {
- return hasSegmentIds;
+ return hasSegmentIds && CollectionUtils.isNullOrEmpty(versions);
} else {
return !hasSegmentIds;
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
index b177b40c587..a7128ce2714 100644
---
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
@@ -41,6 +41,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.hamcrest.MatcherAssert;
+import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
@@ -589,6 +590,156 @@ public class SqlSegmentsMetadataManagerTest
);
}
+ @Test
+ public void
testMarkAsUsedNonOvershadowedSegmentsInEternityIntervalWithVersions() throws
Exception
+ {
+ publishWikiSegments();
+ sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
+ sqlSegmentsMetadataManager.poll();
+
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+
+ final DataSegment koalaSegment1 = createSegment(
+ DS.KOALA,
+ "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
+ "2017-10-15T20:19:12.565Z"
+ );
+
+ final DataSegment koalaSegment2 = createSegment(
+ DS.KOALA,
+ "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
+ "2017-10-16T20:19:12.565Z"
+ );
+
+ // Overshadowed by koalaSegment2
+ final DataSegment koalaSegment3 = createSegment(
+ DS.KOALA,
+ "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
+ "2017-10-15T20:19:12.565Z"
+ );
+
+ publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3);
+
+ sqlSegmentsMetadataManager.poll();
+ Assert.assertEquals(
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
+
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
+ );
+ Assert.assertEquals(
+ 2,
+ sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
+ DS.KOALA,
+ Intervals.ETERNITY,
+ ImmutableList.of("2017-10-15T20:19:12.565Z",
"2017-10-16T20:19:12.565Z")
+ )
+ );
+ sqlSegmentsMetadataManager.poll();
+
+ Assert.assertEquals(
+ ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1,
koalaSegment2),
+
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
+ );
+ }
+
+ @Test
+ public void
testMarkAsUsedNonOvershadowedSegmentsInFiniteIntervalWithVersions() throws
Exception
+ {
+ publishWikiSegments();
+ sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
+ sqlSegmentsMetadataManager.poll();
+
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+
+ final DataSegment koalaSegment1 = createSegment(
+ DS.KOALA,
+ "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
+ "2017-10-15T20:19:12.565Z"
+ );
+
+ final DataSegment koalaSegment2 = createSegment(
+ DS.KOALA,
+ "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
+ "2017-10-16T20:19:12.565Z"
+ );
+
+ // Overshadowed by koalaSegment2
+ final DataSegment koalaSegment3 = createSegment(
+ DS.KOALA,
+ "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
+ "2017-10-15T20:19:12.565Z"
+ );
+
+ publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3);
+
+ sqlSegmentsMetadataManager.poll();
+ Assert.assertEquals(
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
+
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
+ );
+ Assert.assertEquals(
+ 2,
+ sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
+ DS.KOALA,
+ Intervals.of("2017-10-15/2017-10-18"),
+ ImmutableList.of("2017-10-15T20:19:12.565Z",
"2017-10-16T20:19:12.565Z")
+ )
+ );
+ sqlSegmentsMetadataManager.poll();
+
+ Assert.assertEquals(
+ ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1,
koalaSegment2),
+
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
+ );
+ }
+
+ @Test
+ public void testMarkAsUsedNonOvershadowedSegmentsWithNonExistentVersions()
throws Exception
+ {
+ publishWikiSegments();
+ sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
+ sqlSegmentsMetadataManager.poll();
+
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+
+ final DataSegment koalaSegment1 = createSegment(
+ DS.KOALA,
+ "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000",
+ "2017-10-15T20:19:12.565Z"
+ );
+
+ final DataSegment koalaSegment2 = createSegment(
+ DS.KOALA,
+ "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
+ "2017-10-16T20:19:12.565Z"
+ );
+
+ // Overshadowed by koalaSegment2
+ final DataSegment koalaSegment3 = createSegment(
+ DS.KOALA,
+ "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
+ "2017-10-15T20:19:12.565Z"
+ );
+
+ publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3);
+
+ sqlSegmentsMetadataManager.poll();
+ Assert.assertEquals(
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
+
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
+ );
+ Assert.assertEquals(
+ 0,
+ sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
+ DS.KOALA,
+ Intervals.ETERNITY,
+ ImmutableList.of("foo", "bar")
+ )
+ );
+ sqlSegmentsMetadataManager.poll();
+
+ Assert.assertEquals(
+ ImmutableSet.of(wikiSegment1, wikiSegment2),
+
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
+ );
+ }
+
@Test
public void testMarkAsUsedNonOvershadowedSegmentsInvalidDataSource() throws
Exception
{
@@ -683,7 +834,7 @@ public class SqlSegmentsMetadataManagerTest
);
// 2 out of 3 segments match the interval
- Assert.assertEquals(2,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA,
theInterval));
+ Assert.assertEquals(2,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA,
theInterval, null));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
@@ -731,7 +882,7 @@ public class SqlSegmentsMetadataManagerTest
);
// 1 out of 3 segments match the interval, other 2 overlap, only the
segment fully contained will be marked unused
- Assert.assertEquals(1,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA,
theInterval));
+ Assert.assertEquals(1,
sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA,
theInterval, null));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
@@ -787,7 +938,7 @@ public class SqlSegmentsMetadataManagerTest
final Interval theInterval =
Intervals.of("2017-10-15T00:00:00.000/2017-10-18T00:00:00.000");
// 2 out of 3 segments match the interval
- Assert.assertEquals(2,
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA,
theInterval));
+ Assert.assertEquals(2,
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA,
theInterval, null));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
@@ -796,6 +947,104 @@ public class SqlSegmentsMetadataManagerTest
);
}
+ @Test
+ public void testMarkAsUnusedSegmentsInIntervalAndVersions() throws
IOException
+ {
+ publishWikiSegments();
+ sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
+ sqlSegmentsMetadataManager.poll();
+
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+
+ final DateTime now = DateTimes.nowUtc();
+ final String v1 = now.toString();
+ final String v2 = now.plus(Duration.standardDays(1)).toString();
+
+ final DataSegment koalaSegment1 = createSegment(
+ DS.KOALA,
+ "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
+ v1
+ );
+ final DataSegment koalaSegment2 = createSegment(
+ DS.KOALA,
+ "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
+ v2
+ );
+ final DataSegment koalaSegment3 = createSegment(
+ DS.KOALA,
+ "2017-10-19T00:00:00.000/2017-10-20T00:00:00.000",
+ v2
+ );
+
+ publisher.publishSegment(koalaSegment1);
+ publisher.publishSegment(koalaSegment2);
+ publisher.publishSegment(koalaSegment3);
+ final Interval theInterval = Intervals.of("2017-10-15/2017-10-18");
+
+ Assert.assertEquals(
+ 2,
+ sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(
+ DS.KOALA,
+ theInterval,
+ ImmutableList.of(v1, v2)
+ )
+ );
+
+ sqlSegmentsMetadataManager.poll();
+ Assert.assertEquals(
+ ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment3),
+
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
+ );
+ }
+
+ @Test
+ public void testMarkAsUnusedSegmentsInIntervalAndNonExistentVersions()
throws IOException
+ {
+ publishWikiSegments();
+ sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
+ sqlSegmentsMetadataManager.poll();
+
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+
+ final DateTime now = DateTimes.nowUtc();
+ final String v1 = now.toString();
+ final String v2 = now.plus(Duration.standardDays(1)).toString();
+
+ final DataSegment koalaSegment1 = createSegment(
+ DS.KOALA,
+ "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000",
+ v1
+ );
+ final DataSegment koalaSegment2 = createSegment(
+ DS.KOALA,
+ "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000",
+ v2
+ );
+ final DataSegment koalaSegment3 = createSegment(
+ DS.KOALA,
+ "2017-10-19T00:00:00.000/2017-10-20T00:00:00.000",
+ v2
+ );
+
+ publisher.publishSegment(koalaSegment1);
+ publisher.publishSegment(koalaSegment2);
+ publisher.publishSegment(koalaSegment3);
+ final Interval theInterval = Intervals.of("2017-10-15/2017-10-18");
+
+ Assert.assertEquals(
+ 0,
+ sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(
+ DS.KOALA,
+ theInterval,
+ ImmutableList.of("foo", "bar", "baz")
+ )
+ );
+
+ sqlSegmentsMetadataManager.poll();
+ Assert.assertEquals(
+ ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1,
koalaSegment2, koalaSegment3),
+
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
+ );
+ }
+
@Test
public void testMarkAsUnusedSegmentsInIntervalWithOverlappingInterval()
throws IOException
{
@@ -822,7 +1071,7 @@ public class SqlSegmentsMetadataManagerTest
final Interval theInterval =
Intervals.of("2017-10-16T00:00:00.000/2017-10-20T00:00:00.000");
// 1 out of 3 segments match the interval, other 2 overlap, only the
segment fully contained will be marked unused
- Assert.assertEquals(1,
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA,
theInterval));
+ Assert.assertEquals(1,
sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA,
theInterval, null));
sqlSegmentsMetadataManager.poll();
Assert.assertEquals(
diff --git
a/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java
b/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java
index b195fdad8de..7ec3152ceed 100644
--- a/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java
+++ b/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java
@@ -148,7 +148,7 @@ public class TestDerbyConnector extends DerbyConnector
}
/**
- * A wrapper class for queries on the segments table.
+ * A wrapper class for updating the segments table.
*/
public static class SegmentsTable
{
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
index adf12ae7054..d255d0abc7d 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
@@ -87,7 +87,7 @@ public class TestSegmentsMetadataManager implements
SegmentsMetadataManager
}
@Override
- public int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource,
Interval interval)
+ public int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource,
Interval interval, @Nullable List<String> versions)
{
return 0;
}
@@ -116,7 +116,7 @@ public class TestSegmentsMetadataManager implements
SegmentsMetadataManager
}
@Override
- public int markAsUnusedSegmentsInInterval(String dataSource, Interval
interval)
+ public int markAsUnusedSegmentsInInterval(String dataSource, Interval
interval, @Nullable List<String> versions)
{
return 0;
}
diff --git
a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
index bd673f078b2..f6bccbeb7da 100644
---
a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.server.http;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -36,6 +38,7 @@ import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.SegmentLoadInfo;
import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.SegmentsMetadataManager;
@@ -734,15 +737,56 @@ public class DataSourcesResourceTest
public void testMarkAsUsedNonOvershadowedSegmentsInterval()
{
Interval interval = Intervals.of("2010-01-22/P1D");
- int numUpdatedSegments =
-
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"),
EasyMock.eq(interval));
+ int numUpdatedSegments =
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
+ EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull()
+ );
+ EasyMock.expect(numUpdatedSegments).andReturn(3).once();
+ EasyMock.replay(segmentsMetadataManager, inventoryView, server);
+
+ DataSourcesResource dataSourcesResource = createResource();
+ Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
+ "datasource1",
+ new DataSourcesResource.SegmentsToUpdateFilter(interval, null, null)
+ );
+ Assert.assertEquals(200, response.getStatus());
+ EasyMock.verify(segmentsMetadataManager, inventoryView, server);
+ }
+
+ @Test
+ public void testMarkAsUsedNonOvershadowedSegmentsIntervalWithVersions()
+ {
+ Interval interval = Intervals.of("2010-01-22/P1D");
+
+ int numUpdatedSegments =
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
+ EasyMock.eq("datasource1"), EasyMock.eq(interval),
EasyMock.eq(ImmutableList.of("v0"))
+ );
EasyMock.expect(numUpdatedSegments).andReturn(3).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
- new DataSourcesResource.MarkDataSourceSegmentsPayload(interval, null)
+ new DataSourcesResource.SegmentsToUpdateFilter(interval, null,
ImmutableList.of("v0"))
+ );
+ Assert.assertEquals(200, response.getStatus());
+ EasyMock.verify(segmentsMetadataManager, inventoryView, server);
+ }
+
+ @Test
+ public void
testMarkAsUsedNonOvershadowedSegmentsIntervalWithNonExistentVersion()
+ {
+ Interval interval = Intervals.of("2010-01-22/P1D");
+
+ int numUpdatedSegments =
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
+ EasyMock.eq("datasource1"), EasyMock.eq(interval),
EasyMock.eq(ImmutableList.of("foo"))
+ );
+ EasyMock.expect(numUpdatedSegments).andReturn(0).once();
+ EasyMock.replay(segmentsMetadataManager, inventoryView, server);
+
+ DataSourcesResource dataSourcesResource = createResource();
+ Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
+ "datasource1",
+ new DataSourcesResource.SegmentsToUpdateFilter(interval, null,
ImmutableList.of("foo"))
);
Assert.assertEquals(200, response.getStatus());
EasyMock.verify(segmentsMetadataManager, inventoryView, server);
@@ -752,8 +796,9 @@ public class DataSourcesResourceTest
public void testMarkAsUsedNonOvershadowedSegmentsIntervalNoneUpdated()
{
Interval interval = Intervals.of("2010-01-22/P1D");
- int numUpdatedSegments =
-
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"),
EasyMock.eq(interval));
+ int numUpdatedSegments =
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
+ EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull()
+ );
EasyMock.expect(numUpdatedSegments).andReturn(0).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
@@ -761,7 +806,7 @@ public class DataSourcesResourceTest
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
- new DataSourcesResource.MarkDataSourceSegmentsPayload(interval, null)
+ new DataSourcesResource.SegmentsToUpdateFilter(interval, null, null)
);
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0),
response.getEntity());
EasyMock.verify(segmentsMetadataManager, inventoryView, server);
@@ -771,8 +816,9 @@ public class DataSourcesResourceTest
public void testMarkAsUsedNonOvershadowedSegmentsSet()
{
Set<String> segmentIds =
ImmutableSet.of(dataSegmentList.get(1).getId().toString());
- int numUpdatedSegments =
-
segmentsMetadataManager.markAsUsedNonOvershadowedSegments(EasyMock.eq("datasource1"),
EasyMock.eq(segmentIds));
+ int numUpdatedSegments =
segmentsMetadataManager.markAsUsedNonOvershadowedSegments(
+ EasyMock.eq("datasource1"), EasyMock.eq(segmentIds)
+ );
EasyMock.expect(numUpdatedSegments).andReturn(3).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
@@ -780,7 +826,7 @@ public class DataSourcesResourceTest
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
- new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds)
+ new DataSourcesResource.SegmentsToUpdateFilter(null, segmentIds, null)
);
Assert.assertEquals(200, response.getStatus());
EasyMock.verify(segmentsMetadataManager, inventoryView, server);
@@ -790,8 +836,9 @@ public class DataSourcesResourceTest
public void testMarkAsUsedNonOvershadowedSegmentsIntervalException()
{
Interval interval = Intervals.of("2010-01-22/P1D");
- int numUpdatedSegments =
-
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"),
EasyMock.eq(interval));
+ int numUpdatedSegments =
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
+ EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull()
+ );
EasyMock.expect(numUpdatedSegments).andThrow(new
RuntimeException("Error!")).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
@@ -799,7 +846,7 @@ public class DataSourcesResourceTest
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
- new DataSourcesResource.MarkDataSourceSegmentsPayload(interval, null)
+ new DataSourcesResource.SegmentsToUpdateFilter(interval, null, null)
);
Assert.assertEquals(500, response.getStatus());
EasyMock.verify(segmentsMetadataManager, inventoryView, server);
@@ -809,8 +856,9 @@ public class DataSourcesResourceTest
public void testMarkAsUsedNonOvershadowedSegmentsNoDataSource()
{
Interval interval = Intervals.of("2010-01-22/P1D");
- int numUpdatedSegments =
-
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"),
EasyMock.eq(interval));
+ int numUpdatedSegments =
segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(
+ EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull()
+ );
EasyMock.expect(numUpdatedSegments).andReturn(0).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
@@ -818,7 +866,7 @@ public class DataSourcesResourceTest
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
- new
DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"),
null)
+ new
DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-22/P1D"),
null, null)
);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0),
response.getEntity());
@@ -826,13 +874,13 @@ public class DataSourcesResourceTest
}
@Test
- public void
testMarkAsUsedNonOvershadowedSegmentsWithNullIntervalAndSegmentIds()
+ public void
testMarkAsUsedNonOvershadowedSegmentsWithNullIntervalAndSegmentIdsAndVersions()
{
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
- new DataSourcesResource.MarkDataSourceSegmentsPayload(null, null)
+ new DataSourcesResource.SegmentsToUpdateFilter(null, null, null)
);
Assert.assertEquals(400, response.getStatus());
}
@@ -844,20 +892,22 @@ public class DataSourcesResourceTest
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
- new
DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"),
ImmutableSet.of())
+ new DataSourcesResource.SegmentsToUpdateFilter(
+ Intervals.of("2010-01-22/P1D"), ImmutableSet.of(), null
+ )
);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0),
response.getEntity());
}
@Test
- public void
testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndNullSegmentIds()
+ public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullInterval()
{
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
- new
DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"),
null)
+ new
DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-22/P1D"),
null, null)
);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0),
response.getEntity());
@@ -870,32 +920,99 @@ public class DataSourcesResourceTest
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
- new
DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"),
ImmutableSet.of("segment1"))
+ new
DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-22/P1D"),
ImmutableSet.of("segment1"), null)
+ );
+ Assert.assertEquals(400, response.getStatus());
+ }
+
+ @Test
+ public void
testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndSegmentIdsAndVersions()
+ {
+ DataSourcesResource dataSourcesResource = createResource();
+
+ Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
+ "datasource1",
+ new DataSourcesResource.SegmentsToUpdateFilter(
+ Intervals.of("2020/2030"), ImmutableSet.of("seg1"),
ImmutableList.of("v1", "v2")
+ )
+ );
+ Assert.assertEquals(400, response.getStatus());
+ }
+
+ @Test
+ public void testMarkAsUsedNonOvershadowedSegmentsWithEmptySegmentIds()
+ {
+ DataSourcesResource dataSourcesResource = createResource();
+
+ Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
+ "datasource1",
+ new DataSourcesResource.SegmentsToUpdateFilter(null,
ImmutableSet.of(), null)
+ );
+ Assert.assertEquals(400, response.getStatus());
+ }
+
+ @Test
+ public void testMarkAsUsedNonOvershadowedSegmentsWithEmptyVersions()
+ {
+ DataSourcesResource dataSourcesResource = createResource();
+
+ Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
+ "datasource1",
+ new DataSourcesResource.SegmentsToUpdateFilter(null, null,
ImmutableList.of())
);
Assert.assertEquals(400, response.getStatus());
}
@Test
- public void
testMarkAsUsedNonOvershadowedSegmentsWithNullIntervalAndEmptySegmentIds()
+ public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullVersions()
{
DataSourcesResource dataSourcesResource = createResource();
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
- new DataSourcesResource.MarkDataSourceSegmentsPayload(null,
ImmutableSet.of())
+ new DataSourcesResource.SegmentsToUpdateFilter(null, null,
ImmutableList.of("v1", "v2"))
);
Assert.assertEquals(400, response.getStatus());
}
@Test
- public void testMarkAsUsedNonOvershadowedSegmentsNoPayload()
+ public void
testMarkAsUsedNonOvershadowedSegmentsWithNonNullSegmentIdsAndVersions()
{
DataSourcesResource dataSourcesResource = createResource();
- Response response =
dataSourcesResource.markAsUsedNonOvershadowedSegments("datasource1", null);
+ Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
+ "datasource1",
+ new DataSourcesResource.SegmentsToUpdateFilter(null,
ImmutableSet.of("segment1"), ImmutableList.of("v1", "v2"))
+ );
Assert.assertEquals(400, response.getStatus());
}
+ @Test
+ public void
testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndVersions()
+ {
+ DataSourcesResource dataSourcesResource = createResource();
+
+ Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
+ "datasource1",
+ new DataSourcesResource.SegmentsToUpdateFilter(Intervals.ETERNITY,
null, ImmutableList.of("v1", "v2"))
+ );
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0),
response.getEntity());
+ }
+
+ @Test
+ public void
testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndEmptyVersions()
+ {
+ DataSourcesResource dataSourcesResource = createResource();
+
+ Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
+ "datasource1",
+ new DataSourcesResource.SegmentsToUpdateFilter(Intervals.ETERNITY,
null, ImmutableList.of())
+ );
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0),
response.getEntity());
+ }
+
@Test
public void testSegmentLoadChecksForVersion()
{
@@ -1039,12 +1156,13 @@ public class DataSourcesResourceTest
EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused(segmentIds)).andReturn(1).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
- final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
- new DataSourcesResource.MarkDataSourceSegmentsPayload(
+ final DataSourcesResource.SegmentsToUpdateFilter payload =
+ new DataSourcesResource.SegmentsToUpdateFilter(
null,
segmentIds.stream()
.map(SegmentId::toString)
- .collect(Collectors.toSet())
+ .collect(Collectors.toSet()),
+ null
);
DataSourcesResource dataSourcesResource = createResource();
@@ -1068,12 +1186,13 @@ public class DataSourcesResourceTest
EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused(segmentIds)).andReturn(0).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
- final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
- new DataSourcesResource.MarkDataSourceSegmentsPayload(
+ final DataSourcesResource.SegmentsToUpdateFilter payload =
+ new DataSourcesResource.SegmentsToUpdateFilter(
null,
segmentIds.stream()
.map(SegmentId::toString)
- .collect(Collectors.toSet())
+ .collect(Collectors.toSet()),
+ null
);
DataSourcesResource dataSourcesResource = createResource();
@@ -1099,16 +1218,16 @@ public class DataSourcesResourceTest
.once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
- final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
- new DataSourcesResource.MarkDataSourceSegmentsPayload(
+ final DataSourcesResource.SegmentsToUpdateFilter payload =
+ new DataSourcesResource.SegmentsToUpdateFilter(
null,
segmentIds.stream()
.map(SegmentId::toString)
- .collect(Collectors.toSet())
+ .collect(Collectors.toSet()),
+ null
);
- DataSourcesResource dataSourcesResource =
- createResource();
+ DataSourcesResource dataSourcesResource = createResource();
Response response =
dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request);
Assert.assertEquals(500, response.getStatus());
Assert.assertNotNull(response.getEntity());
@@ -1120,11 +1239,11 @@ public class DataSourcesResourceTest
{
final Interval theInterval = Intervals.of("2010-01-01/P1D");
-
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1",
theInterval)).andReturn(1).once();
+
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1",
theInterval, null)).andReturn(1).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
- final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
- new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval,
null);
+ final DataSourcesResource.SegmentsToUpdateFilter payload =
+ new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null,
null);
DataSourcesResource dataSourcesResource = createResource();
prepareRequestForAudit();
@@ -1140,11 +1259,11 @@ public class DataSourcesResourceTest
{
final Interval theInterval = Intervals.of("2010-01-01/P1D");
-
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1",
theInterval)).andReturn(0).once();
+
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1",
theInterval, null)).andReturn(0).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
- final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
- new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval,
null);
+ final DataSourcesResource.SegmentsToUpdateFilter payload =
+ new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null,
null);
DataSourcesResource dataSourcesResource = createResource();
prepareRequestForAudit();
@@ -1159,16 +1278,15 @@ public class DataSourcesResourceTest
{
final Interval theInterval = Intervals.of("2010-01-01/P1D");
-
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1",
theInterval))
+
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1",
theInterval, null))
.andThrow(new RuntimeException("Exception occurred"))
.once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
- final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
- new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval,
null);
+ final DataSourcesResource.SegmentsToUpdateFilter payload =
+ new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null,
null);
- DataSourcesResource dataSourcesResource =
- createResource();
+ DataSourcesResource dataSourcesResource = createResource();
Response response =
dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request);
Assert.assertEquals(500, response.getStatus());
Assert.assertNotNull(response.getEntity());
@@ -1179,11 +1297,50 @@ public class DataSourcesResourceTest
public void testMarkAsUnusedSegmentsInIntervalNoDataSource()
{
final Interval theInterval = Intervals.of("2010-01-01/P1D");
-
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1",
theInterval)).andReturn(0).once();
+
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1",
theInterval, null))
+ .andReturn(0).once();
+ EasyMock.replay(segmentsMetadataManager, inventoryView, server);
+
+ final DataSourcesResource.SegmentsToUpdateFilter payload =
+ new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null,
null);
+ DataSourcesResource dataSourcesResource = createResource();
+ prepareRequestForAudit();
+
+ Response response =
dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0),
response.getEntity());
+ EasyMock.verify(segmentsMetadataManager);
+ }
+
+ @Test
+ public void testMarkAsUnusedSegmentsInIntervalWithVersions()
+ {
+ final Interval theInterval = Intervals.of("2010-01-01/P1D");
+
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1",
theInterval, ImmutableList.of("v1")))
+ .andReturn(2).once();
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
- final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
- new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval,
null);
+ final DataSourcesResource.SegmentsToUpdateFilter payload =
+ new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null,
ImmutableList.of("v1"));
+ DataSourcesResource dataSourcesResource = createResource();
+ prepareRequestForAudit();
+
+ Response response =
dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("numChangedSegments", 2),
response.getEntity());
+ EasyMock.verify(segmentsMetadataManager);
+ }
+
+ @Test
+ public void testMarkAsUnusedSegmentsInIntervalWithNonExistentVersion()
+ {
+ final Interval theInterval = Intervals.of("2010-01-01/P1D");
+
EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1",
theInterval, ImmutableList.of("foo")))
+ .andReturn(0).once();
+ EasyMock.replay(segmentsMetadataManager, inventoryView, server);
+
+ final DataSourcesResource.SegmentsToUpdateFilter payload =
+ new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null,
ImmutableList.of("foo"));
DataSourcesResource dataSourcesResource = createResource();
prepareRequestForAudit();
@@ -1193,6 +1350,21 @@ public class DataSourcesResourceTest
EasyMock.verify(segmentsMetadataManager);
}
+ @Test
+ public void testSegmentsToUpdateFilterSerde() throws JsonProcessingException
+ {
+ final ObjectMapper mapper = new DefaultObjectMapper();
+ final String payload =
"{\"interval\":\"2023-01-01T00:00:00.000Z/2024-01-01T00:00:00.000Z\",\"segmentIds\":null,\"versions\":[\"v1\"]}";
+
+ final DataSourcesResource.SegmentsToUpdateFilter obj =
+ mapper.readValue(payload,
DataSourcesResource.SegmentsToUpdateFilter.class);
+ Assert.assertEquals(Intervals.of("2023/2024"), obj.getInterval());
+ Assert.assertEquals(ImmutableList.of("v1"), obj.getVersions());
+ Assert.assertNull(obj.getSegmentIds());
+
+ Assert.assertEquals(payload, mapper.writeValueAsString(obj));
+ }
+
@Test
public void testMarkSegmentsAsUnusedNullPayload()
{
@@ -1202,18 +1374,19 @@ public class DataSourcesResourceTest
Assert.assertEquals(400, response.getStatus());
Assert.assertNotNull(response.getEntity());
Assert.assertEquals(
- "Invalid request payload, either interval or segmentIds array must be
specified",
+ "Invalid request payload. Specify either 'interval' or 'segmentIds',
but not both."
+ + " Optionally, include 'versions' only when 'interval' is provided.",
response.getEntity()
);
}
@Test
- public void testMarkSegmentsAsUnusedWithNullIntervalAndSegmentIds()
+ public void
testMarkSegmentsAsUnusedWithNullIntervalAndSegmentIdsAndVersions()
{
DataSourcesResource dataSourcesResource = createResource();
- final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
- new DataSourcesResource.MarkDataSourceSegmentsPayload(null, null);
+ final DataSourcesResource.SegmentsToUpdateFilter payload =
+ new DataSourcesResource.SegmentsToUpdateFilter(null, null, null);
Response response =
dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request);
Assert.assertEquals(400, response.getStatus());
@@ -1224,10 +1397,9 @@ public class DataSourcesResourceTest
public void testMarkSegmentsAsUnusedWithNonNullIntervalAndEmptySegmentIds()
{
DataSourcesResource dataSourcesResource = createResource();
-
- final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
- new
DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-01/P1D"),
ImmutableSet.of());
prepareRequestForAudit();
+ final DataSourcesResource.SegmentsToUpdateFilter payload =
+ new
DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-01/P1D"),
ImmutableSet.of(), null);
Response response =
dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request);
Assert.assertEquals(200, response.getStatus());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]