This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b250c5  Allow kill task to mark segments as unused (#11501)
9b250c5 is described below

commit 9b250c54aa1b18c21ff8369ee4a4a6015bbafc40
Author: Jonathan Wei <[email protected]>
AuthorDate: Thu Jul 29 10:48:43 2021 -0500

    Allow kill task to mark segments as unused (#11501)
    
    * Allow kill task to mark segments as unused
    
    * Add IndexerSQLMetadataStorageCoordinator test
    
    * Update docs/ingestion/data-management.md
    
    Co-authored-by: Jihoon Son <[email protected]>
    
    * Add warning to kill task doc
    
    Co-authored-by: Jihoon Son <[email protected]>
---
 docs/ingestion/data-management.md                  |  9 ++-
 .../common/actions/MarkSegmentsAsUnusedAction.java | 67 ++++++++--------------
 .../druid/indexing/common/actions/TaskAction.java  |  1 +
 .../common/task/KillUnusedSegmentsTask.java        | 23 +++++++-
 ...ClientKillUnusedSegmentsTaskQuerySerdeTest.java |  8 ++-
 .../common/task/KillUnusedSegmentsTaskTest.java    | 53 ++++++++++++++++-
 .../druid/indexing/overlord/TaskLifecycleTest.java |  8 ++-
 .../TestIndexerMetadataStorageCoordinator.java     |  6 ++
 .../ClientKillUnusedSegmentsTaskQuery.java         | 20 +++++--
 .../client/indexing/HttpIndexingServiceClient.java |  2 +-
 .../IndexerMetadataStorageCoordinator.java         | 10 ++++
 .../IndexerSQLMetadataStorageCoordinator.java      | 27 +++++++++
 .../ClientKillUnusedSegmentsTaskQueryTest.java     |  9 ++-
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 32 +++++++++++
 14 files changed, 218 insertions(+), 57 deletions(-)

diff --git a/docs/ingestion/data-management.md 
b/docs/ingestion/data-management.md
index c9e592f..eb176a0 100644
--- a/docs/ingestion/data-management.md
+++ b/docs/ingestion/data-management.md
@@ -95,7 +95,9 @@ A data deletion tutorial is available at [Tutorial: Deleting 
data](../tutorials/
 
 ## Kill Task
 
-Kill tasks delete all information about a segment and removes it from deep 
storage. Segments to kill must be unused (used==0) in the Druid segment table. 
The available grammar is:
+The kill task deletes all information about segments and removes them from 
deep storage. Segments to kill must be unused (used==0) in the Druid segment 
table.
+
+The available grammar is:
 
 ```json
 {
@@ -103,10 +105,15 @@ Kill tasks delete all information about a segment and 
removes it from deep stora
     "id": <task_id>,
     "dataSource": <task_datasource>,
     "interval" : <all_segments_in_this_interval_will_die!>,
+    "markAsUnused": <true|false>,
     "context": <task context>
 }
 ```
 
+If `markAsUnused` is true (default is false), the kill task will first mark 
any segments within the specified interval as unused, before deleting the 
unused segments within the interval.
+
+**WARNING!** The kill task permanently removes all information about the 
affected segments from the metadata store and deep storage. These segments 
cannot be recovered after the kill task runs, this operation cannot be undone. 
+
 ## Retention
 
 Druid supports retention rules, which are used to define intervals of time 
where data should be preserved, and intervals where data should be discarded.
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java
similarity index 52%
copy from 
server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
copy to 
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java
index ec008d3..5ed7b7e 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java
@@ -17,56 +17,34 @@
  * under the License.
  */
 
-package org.apache.druid.client.indexing;
+package org.apache.druid.indexing.common.actions;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.indexing.common.task.Task;
 import org.joda.time.Interval;
 
-import java.util.Objects;
-
-/**
- * Client representation of 
org.apache.druid.indexing.common.task.KillUnusedSegmentsTask. JSON 
searialization
- * fields of this class must correspond to those of 
org.apache.druid.indexing.common.task.KillUnusedSegmentsTask, except
- * for "id" and "context" fields.
- */
-public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
+public class MarkSegmentsAsUnusedAction implements TaskAction<Integer>
 {
-  public static final String TYPE = "kill";
-
-  private final String id;
+  @JsonIgnore
   private final String dataSource;
+
+  @JsonIgnore
   private final Interval interval;
 
   @JsonCreator
-  public ClientKillUnusedSegmentsTaskQuery(
-      @JsonProperty("id") String id,
+  public MarkSegmentsAsUnusedAction(
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("interval") Interval interval
   )
   {
-    this.id = Preconditions.checkNotNull(id, "id");
     this.dataSource = dataSource;
     this.interval = interval;
   }
 
   @JsonProperty
-  @Override
-  public String getId()
-  {
-    return id;
-  }
-
-  @JsonProperty
-  @Override
-  public String getType()
-  {
-    return TYPE;
-  }
-
-  @JsonProperty
-  @Override
   public String getDataSource()
   {
     return dataSource;
@@ -79,23 +57,24 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
   }
 
   @Override
-  public boolean equals(Object o)
+  public TypeReference<Integer> getReturnTypeReference()
+  {
+    return new TypeReference<Integer>()
+    {
+    };
+  }
+
+  @Override
+  public Integer perform(Task task, TaskActionToolbox toolbox)
   {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    ClientKillUnusedSegmentsTaskQuery that = 
(ClientKillUnusedSegmentsTaskQuery) o;
-    return Objects.equals(id, that.id) &&
-           Objects.equals(dataSource, that.dataSource) &&
-           Objects.equals(interval, that.interval);
+    int numMarked = toolbox.getIndexerMetadataStorageCoordinator()
+                           .markSegmentsAsUnusedWithinInterval(dataSource, 
interval);
+    return numMarked;
   }
 
   @Override
-  public int hashCode()
+  public boolean isAudited()
   {
-    return Objects.hash(id, dataSource, interval);
+    return true;
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
index dd11dee..c84499d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
@@ -38,6 +38,7 @@ import org.apache.druid.indexing.common.task.Task;
     @JsonSubTypes.Type(name = "segmentListUsed", value = 
RetrieveUsedSegmentsAction.class),
     // Type name doesn't correspond to the name of the class for backward 
compatibility.
     @JsonSubTypes.Type(name = "segmentListUnused", value = 
RetrieveUnusedSegmentsAction.class),
+    @JsonSubTypes.Type(name = "markSegmentsAsUnused", value = 
MarkSegmentsAsUnusedAction.class),
     @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),
     @JsonSubTypes.Type(name = "segmentMetadataUpdate", value = 
SegmentMetadataUpdateAction.class),
     @JsonSubTypes.Type(name = SegmentAllocateAction.TYPE, value = 
SegmentAllocateAction.class),
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index 602c21e..c071644 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -25,11 +25,13 @@ import 
org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
 import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
 import org.apache.druid.indexing.common.actions.SegmentNukeAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskLocks;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
@@ -50,13 +52,17 @@ import java.util.stream.Collectors;
  */
 public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
 {
+  private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);
+
+  private final boolean markAsUnused;
 
   @JsonCreator
   public KillUnusedSegmentsTask(
       @JsonProperty("id") String id,
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("interval") Interval interval,
-      @JsonProperty("context") Map<String, Object> context
+      @JsonProperty("context") Map<String, Object> context,
+      @JsonProperty("markAsUnused") Boolean markAsUnused
   )
   {
     super(
@@ -65,6 +71,13 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
         interval,
         context
     );
+    this.markAsUnused = markAsUnused != null && markAsUnused;
+  }
+
+  @JsonProperty
+  public boolean isMarkAsUnused()
+  {
+    return markAsUnused;
   }
 
   @Override
@@ -77,6 +90,14 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
   public TaskStatus run(TaskToolbox toolbox) throws Exception
   {
     final NavigableMap<DateTime, List<TaskLock>> taskLockMap = 
getTaskLockMap(toolbox.getTaskActionClient());
+
+    if (markAsUnused) {
+      int numMarked = toolbox.getTaskActionClient().submit(
+          new MarkSegmentsAsUnusedAction(getDataSource(), getInterval())
+      );
+      LOG.info("Marked %d segments as unused.", numMarked);
+    }
+
     // List unused segments
     final List<DataSegment> unusedSegments = toolbox
         .getTaskActionClient()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
index a049846..e4583c9 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
@@ -50,13 +50,15 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
     final ClientKillUnusedSegmentsTaskQuery taskQuery = new 
ClientKillUnusedSegmentsTaskQuery(
         "killTaskId",
         "datasource",
-        Intervals.of("2020-01-01/P1D")
+        Intervals.of("2020-01-01/P1D"),
+        true
     );
     final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
     final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) 
objectMapper.readValue(json, Task.class);
     Assert.assertEquals(taskQuery.getId(), fromJson.getId());
     Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
     Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
+    Assert.assertEquals(taskQuery.getMarkAsUnused(), 
fromJson.isMarkAsUnused());
   }
 
   @Test
@@ -66,7 +68,8 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
         null,
         "datasource",
         Intervals.of("2020-01-01/P1D"),
-        null
+        null,
+        true
     );
     final byte[] json = objectMapper.writeValueAsBytes(task);
     final ClientKillUnusedSegmentsTaskQuery taskQuery = 
(ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
@@ -76,5 +79,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
     Assert.assertEquals(task.getId(), taskQuery.getId());
     Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
     Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
+    Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index 688158d..e796fde 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -74,7 +74,58 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
     );
 
     final KillUnusedSegmentsTask task =
-        new KillUnusedSegmentsTask(null, DATA_SOURCE, 
Intervals.of("2019-03-01/2019-04-01"), null);
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            Intervals.of("2019-03-01/2019-04-01"),
+            null,
+            false
+        );
+
+    Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task).get().getStatusCode());
+
+    final List<DataSegment> unusedSegments =
+        
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, 
Intervals.of("2019/2020"));
+
+    
Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"),
 version)), unusedSegments);
+    Assertions.assertThat(
+        getMetadataStorageCoordinator()
+            .retrieveUsedSegmentsForInterval(DATA_SOURCE, 
Intervals.of("2019/2020"), Segments.ONLY_VISIBLE)
+    ).containsExactlyInAnyOrder(
+        newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
+        newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
+    );
+  }
+
+
+  @Test
+  public void testKillWithMarkUnused() throws Exception
+  {
+    final String version = DateTimes.nowUtc().toString();
+    final Set<DataSegment> segments = ImmutableSet.of(
+        newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
+        newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
+        newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
+        newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
+    );
+    final Set<DataSegment> announced = 
getMetadataStorageCoordinator().announceHistoricalSegments(segments);
+
+    Assert.assertEquals(segments, announced);
+
+    Assert.assertTrue(
+        getSegmentsMetadataManager().markSegmentAsUnused(
+            newSegment(Intervals.of("2019-02-01/2019-03-01"), 
version).getId().toString()
+        )
+    );
+
+    final KillUnusedSegmentsTask task =
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            Intervals.of("2019-03-01/2019-04-01"),
+            null,
+            true
+        );
 
     Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task).get().getStatusCode());
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 032c2fd..80e31ac 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -929,7 +929,13 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
     }
 
     final Task killUnusedSegmentsTask =
-        new KillUnusedSegmentsTask(null, "test_kill_task", 
Intervals.of("2011-04-01/P4D"), null);
+        new KillUnusedSegmentsTask(
+            null,
+            "test_kill_task",
+            Intervals.of("2011-04-01/P4D"),
+            null,
+            false
+        );
 
     final TaskStatus status = runTask(killUnusedSegmentsTask);
     Assert.assertEquals(taskLocation, status.getLocation());
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index fb274f3..2dcff76 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -107,6 +107,12 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
   }
 
   @Override
+  public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval 
interval)
+  {
+    return 0;
+  }
+
+  @Override
   public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments)
   {
     Set<DataSegment> added = new HashSet<>();
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
index ec008d3..4435e5f 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
@@ -38,17 +38,20 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
   private final String id;
   private final String dataSource;
   private final Interval interval;
+  private final Boolean markAsUnused;
 
   @JsonCreator
   public ClientKillUnusedSegmentsTaskQuery(
       @JsonProperty("id") String id,
       @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("interval") Interval interval
+      @JsonProperty("interval") Interval interval,
+      @JsonProperty("markAsUnused") Boolean markAsUnused
   )
   {
     this.id = Preconditions.checkNotNull(id, "id");
     this.dataSource = dataSource;
     this.interval = interval;
+    this.markAsUnused = markAsUnused;
   }
 
   @JsonProperty
@@ -78,6 +81,12 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
     return interval;
   }
 
+  @JsonProperty
+  public Boolean getMarkAsUnused()
+  {
+    return markAsUnused;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -88,14 +97,15 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
       return false;
     }
     ClientKillUnusedSegmentsTaskQuery that = 
(ClientKillUnusedSegmentsTaskQuery) o;
-    return Objects.equals(id, that.id) &&
-           Objects.equals(dataSource, that.dataSource) &&
-           Objects.equals(interval, that.interval);
+    return Objects.equals(id, that.id)
+           && Objects.equals(dataSource, that.dataSource)
+           && Objects.equals(interval, that.interval)
+           && Objects.equals(markAsUnused, that.markAsUnused);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(id, dataSource, interval);
+    return Objects.hash(id, dataSource, interval, markAsUnused);
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
 
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 44ba61c..60bbae6 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -70,7 +70,7 @@ public class HttpIndexingServiceClient implements 
IndexingServiceClient
   public void killUnusedSegments(String idPrefix, String dataSource, Interval 
interval)
   {
     final String taskId = IdUtils.newTaskId(idPrefix, 
ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval);
-    final ClientTaskQuery taskQuery = new 
ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval);
+    final ClientTaskQuery taskQuery = new 
ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval, false);
     runTask(taskId, taskQuery);
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 513f5b9..acb617f 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -135,6 +135,16 @@ public interface IndexerMetadataStorageCoordinator
   List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource, 
Interval interval);
 
   /**
+   * Mark as unused segments which include ONLY data within the given interval.
+   *
+   * @param dataSource The data source the segments belong to
+   * @param interval   Filter the data segments to ones that include data in 
this interval exclusively.
+   *
+   * @return number of segments marked unused
+   */
+  int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interval);
+
+  /**
    * Attempts to insert a set of segments to the metadata storage. Returns the 
set of segments actually added (segments
    * with identifiers already in the metadata storage will not be added).
    *
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 46d6c0e..4887c90 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -226,6 +226,33 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     return matchingSegments;
   }
 
+  @Override
+  public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval 
interval)
+  {
+    int numSegmentsMarkedUnused = connector.retryTransaction(
+        (handle, status) -> {
+          return handle
+              .createStatement(
+                  StringUtils.format(
+                      "UPDATE %s SET used=false WHERE dataSource = :dataSource 
"
+                      + "AND start >= :start AND %2$send%2$s <= :end",
+                      dbTables.getSegmentsTable(),
+                      connector.getQuoteString()
+                  )
+              )
+              .bind("dataSource", dataSource)
+              .bind("start", interval.getStart().toString())
+              .bind("end", interval.getEnd().toString())
+              .execute();
+        },
+        3,
+        SQLMetadataConnector.DEFAULT_MAX_TRIES
+    );
+
+    log.info("Marked %,d segments unused for %s for interval %s.", 
numSegmentsMarkedUnused, dataSource, interval);
+    return numSegmentsMarkedUnused;
+  }
+
   private List<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(
       final Handle handle,
       final String dataSource,
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
 
b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
index fdd9f78..0e6c0c8 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
@@ -33,13 +33,14 @@ public class ClientKillUnusedSegmentsTaskQueryTest
   private static final String DATA_SOURCE = "data_source";
   public static final DateTime START = DateTimes.nowUtc();
   private static final Interval INTERVAL = new Interval(START, START.plus(1));
+  private static final Boolean MARK_UNUSED = true;
 
   ClientKillUnusedSegmentsTaskQuery clientKillUnusedSegmentsQuery;
 
   @Before
   public void setUp()
   {
-    clientKillUnusedSegmentsQuery = new 
ClientKillUnusedSegmentsTaskQuery("killTaskId", DATA_SOURCE, INTERVAL);
+    clientKillUnusedSegmentsQuery = new 
ClientKillUnusedSegmentsTaskQuery("killTaskId", DATA_SOURCE, INTERVAL, true);
   }
 
   @After
@@ -67,6 +68,12 @@ public class ClientKillUnusedSegmentsTaskQueryTest
   }
 
   @Test
+  public void testGetMarkUnused()
+  {
+    Assert.assertEquals(MARK_UNUSED, 
clientKillUnusedSegmentsQuery.getMarkAsUnused());
+  }
+
+  @Test
   public void testEquals()
   {
     EqualsVerifier.forClass(ClientKillUnusedSegmentsTaskQuery.class)
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 7acd90f..135c893 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -1549,4 +1549,36 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
     Assert.assertEquals(0, deletedCount);
   }
+
+  @Test
+  public void testMarkSegmentsAsUnusedWithinInterval() throws IOException
+  {
+    coordinator.announceHistoricalSegments(ImmutableSet.of(existingSegment1, 
existingSegment2));
+
+    // interval covers existingSegment1 and partially overlaps 
existingSegment2,
+    // only existingSegment1 will be dropped
+    coordinator.markSegmentsAsUnusedWithinInterval(
+        existingSegment1.getDataSource(),
+        Intervals.of("1993-12-31T12Z/1994-01-02T12Z")
+    );
+
+    Assert.assertEquals(
+        ImmutableSet.of(existingSegment1),
+        ImmutableSet.copyOf(
+            coordinator.retrieveUnusedSegmentsForInterval(
+                existingSegment1.getDataSource(),
+                
existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1))
+            )
+        )
+    );
+    Assert.assertEquals(
+        ImmutableSet.of(),
+        ImmutableSet.copyOf(
+            coordinator.retrieveUnusedSegmentsForInterval(
+                existingSegment2.getDataSource(),
+                
existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1))
+            )
+        )
+    );
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to