This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d7c152c82c Add a TaskReport for "kill" tasks (#15023)
d7c152c82c is described below

commit d7c152c82c16983109a303cb4ea5eb03861dbedf
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Sep 23 07:44:27 2023 +0530

    Add a TaskReport for "kill" tasks (#15023)
    
    - Add `KillTaskReport` that contains stats for `numSegmentsKilled`,
    `numBatchesProcessed`, `numSegmentsMarkedAsUnused`
    - Fix bug where exception message had no formatter but was was still being 
passed some args.
    - Add some comments regarding deprecation of `markAsUnused` flag.
---
 .../common/IngestionStatsAndErrorsTaskReport.java  |   6 +-
 .../druid/indexing/common/KillTaskReport.java      | 133 +++++++++++++++++++++
 .../apache/druid/indexing/common/TaskReport.java   |   3 +-
 .../common/task/KillUnusedSegmentsTask.java        |  89 +++++++-------
 .../common/task/KillUnusedSegmentsTaskTest.java    |  54 +++++++--
 .../ClientKillUnusedSegmentsTaskQuery.java         |   9 +-
 6 files changed, 237 insertions(+), 57 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
index 9d9d866853..35ae2f6698 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.common;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 import java.util.Objects;
@@ -28,11 +29,12 @@ public class IngestionStatsAndErrorsTaskReport implements 
TaskReport
   public static final String REPORT_KEY = "ingestionStatsAndErrors";
 
   @JsonProperty
-  private String taskId;
+  private final String taskId;
 
   @JsonProperty
-  private IngestionStatsAndErrorsTaskReportData payload;
+  private final IngestionStatsAndErrorsTaskReportData payload;
 
+  @JsonCreator
   public IngestionStatsAndErrorsTaskReport(
       @JsonProperty("taskId") String taskId,
       @JsonProperty("payload") IngestionStatsAndErrorsTaskReportData payload
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java
new file mode 100644
index 0000000000..f97f761166
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+public class KillTaskReport implements TaskReport
+{
+  public static final String REPORT_KEY = "killUnusedSegments";
+
+  private final String taskId;
+  private final Stats stats;
+
+  @JsonCreator
+  public KillTaskReport(
+      @JsonProperty("taskId") String taskId,
+      @JsonProperty("payload") Stats stats
+  )
+  {
+    this.taskId = taskId;
+    this.stats = stats;
+  }
+
+  @Override
+  @JsonProperty
+  public String getTaskId()
+  {
+    return taskId;
+  }
+
+  @Override
+  public String getReportKey()
+  {
+    return REPORT_KEY;
+  }
+
+  @Override
+  @JsonProperty
+  public Object getPayload()
+  {
+    return stats;
+  }
+
+  public static class Stats
+  {
+    private final int numSegmentsKilled;
+    private final int numBatchesProcessed;
+    private final Integer numSegmentsMarkedAsUnused;
+
+    @JsonCreator
+    public Stats(
+        @JsonProperty("numSegmentsKilled") int numSegmentsKilled,
+        @JsonProperty("numBatchesProcessed") int numBatchesProcessed,
+        @JsonProperty("numSegmentsMarkedAsUnused") @Nullable Integer 
numSegmentsMarkedAsUnused
+    )
+    {
+      this.numSegmentsKilled = numSegmentsKilled;
+      this.numBatchesProcessed = numBatchesProcessed;
+      this.numSegmentsMarkedAsUnused = numSegmentsMarkedAsUnused;
+    }
+
+    @JsonProperty
+    public int getNumSegmentsKilled()
+    {
+      return numSegmentsKilled;
+    }
+
+    @JsonProperty
+    public int getNumBatchesProcessed()
+    {
+      return numBatchesProcessed;
+    }
+
+    @Nullable
+    @JsonProperty
+    public Integer getNumSegmentsMarkedAsUnused()
+    {
+      return numSegmentsMarkedAsUnused;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      Stats that = (Stats) o;
+      return numSegmentsKilled == that.numSegmentsKilled
+             && numBatchesProcessed == that.numBatchesProcessed
+             && Objects.equals(this.numSegmentsMarkedAsUnused, 
that.numSegmentsMarkedAsUnused);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(numSegmentsKilled, numBatchesProcessed, 
numSegmentsMarkedAsUnused);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Stats{" +
+             "numSegmentsKilled=" + numSegmentsKilled +
+             ", numBatchesProcessed=" + numBatchesProcessed +
+             ", numSegmentsMarkedAsUnused=" + numSegmentsMarkedAsUnused +
+             '}';
+    }
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
index 5e2f5b6c74..2ac260d72f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
@@ -31,7 +31,8 @@ import java.util.Map;
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 @JsonSubTypes(value = {
-    @JsonSubTypes.Type(name = "ingestionStatsAndErrors", value = 
IngestionStatsAndErrorsTaskReport.class)
+    @JsonSubTypes.Type(name = "ingestionStatsAndErrors", value = 
IngestionStatsAndErrorsTaskReport.class),
+    @JsonSubTypes.Type(name = KillTaskReport.REPORT_KEY, value = 
KillTaskReport.class)
 })
 public interface TaskReport
 {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index 35653aa230..be1d02f429 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -24,12 +24,13 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.KillTaskReport;
 import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
 import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
@@ -46,7 +47,6 @@ import org.joda.time.Interval;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -61,6 +61,8 @@ import java.util.stream.Collectors;
  * The client representation of this task is {@link 
ClientKillUnusedSegmentsTaskQuery}.
  * JSON serialization fields of this class must correspond to those of {@link
  * ClientKillUnusedSegmentsTaskQuery}, except for "id" and "context" fields.
+ * <p>
+ * The field {@link #isMarkAsUnused()} is now deprecated.
  */
 public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
 {
@@ -77,18 +79,18 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
    */
   private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;
 
+  @Deprecated
   private final boolean markAsUnused;
   /**
    * Split processing to try and keep each nuke operation relatively short, in 
the case that either
    * the database or the storage layer is particularly slow.
    */
   private final int batchSize;
-  @Nullable private final Integer limit;
-
 
-  // counters included primarily for testing
-  private int numSegmentsKilled = 0;
-  private long numBatchesProcessed = 0;
+  /**
+   * Maximum number of segments that can be killed.
+   */
+  @Nullable private final Integer limit;
 
   @JsonCreator
   public KillUnusedSegmentsTask(
@@ -109,22 +111,26 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
     );
     this.markAsUnused = markAsUnused != null && markAsUnused;
     this.batchSize = (batchSize != null) ? batchSize : 
DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
-    Preconditions.checkArgument(this.batchSize > 0, "batchSize should be 
greater than zero");
-    if (null != limit && limit <= 0) {
-      throw InvalidInput.exception(
-          "limit [%d] is invalid. It must be a positive integer.",
-          limit
-      );
+    if (this.batchSize <= 0) {
+      throw InvalidInput.exception("batchSize[%d] must be a positive 
integer.", limit);
     }
-    if (limit != null && markAsUnused != null && markAsUnused) {
-      throw InvalidInput.exception(
-          "limit cannot be provided with markAsUnused.",
-          limit
-      );
+    if (limit != null && limit <= 0) {
+      throw InvalidInput.exception("Limit[%d] must be a positive integer.", 
limit);
+    }
+    if (limit != null && Boolean.TRUE.equals(markAsUnused)) {
+      throw InvalidInput.exception("Limit cannot be provided when markAsUnused 
is enabled.");
     }
     this.limit = limit;
   }
 
+  /**
+   * This field has been deprecated as "kill" tasks should not be responsible 
for
+   * marking segments as unused. Instead, users should call the Coordinator API
+   * {@code /{dataSourceName}/markUnused} to explicitly mark segments as 
unused.
+   * Segments may also be marked unused by the Coordinator if they become 
overshadowed
+   * or have a {@code DropRule} applied to them.
+   */
+  @Deprecated
   @JsonProperty
   @JsonInclude(JsonInclude.Include.NON_DEFAULT)
   public boolean isMarkAsUnused()
@@ -160,30 +166,22 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
     return ImmutableSet.of();
   }
 
-  @JsonIgnore
-  @VisibleForTesting
-  long getNumBatchesProcessed()
-  {
-    return numBatchesProcessed;
-  }
-
-  @JsonIgnore
-  @VisibleForTesting
-  long getNumSegmentsKilled()
-  {
-    return numSegmentsKilled;
-  }
-
   @Override
   public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
     final NavigableMap<DateTime, List<TaskLock>> taskLockMap = 
getTaskLockMap(toolbox.getTaskActionClient());
 
+    // Track stats for reporting
+    int numSegmentsKilled = 0;
+    int numBatchesProcessed = 0;
+    final int numSegmentsMarkedAsUnused;
     if (markAsUnused) {
-      int numMarked = toolbox.getTaskActionClient().submit(
+      numSegmentsMarkedAsUnused = toolbox.getTaskActionClient().submit(
           new MarkSegmentsAsUnusedAction(getDataSource(), getInterval())
       );
-      LOG.info("Marked %d segments as unused.", numMarked);
+      LOG.info("Marked [%d] segments as unused.", numSegmentsMarkedAsUnused);
+    } else {
+      numSegmentsMarkedAsUnused = 0;
     }
 
     // List unused segments
@@ -194,7 +192,7 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
         "Starting kill with batchSize[%d], up to limit[%d] segments will be 
deleted%s",
         batchSize,
         limit,
-        numTotalBatches != null ? StringUtils.format(" in([%d] batches]).", 
numTotalBatches) : "."
+        numTotalBatches != null ? StringUtils.format(" in [%d] batches.", 
numTotalBatches) : "."
     );
     do {
       if (nextBatchSize <= 0) {
@@ -229,16 +227,21 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
       nextBatchSize = computeNextBatchSize(numSegmentsKilled);
     } while (unusedSegments.size() != 0 && (null == numTotalBatches || 
numBatchesProcessed < numTotalBatches));
 
-    LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. 
Deleted total [%d] unused segments "
-             + "in [%d] batches.",
-        getId(),
-        getDataSource(),
-        getInterval(),
-        numSegmentsKilled,
-        numBatchesProcessed
+    final String taskId = getId();
+    LOG.info(
+        "Finished kill task[%s] for dataSource[%s] and interval[%s]."
+        + " Deleted total [%d] unused segments in [%d] batches.",
+        taskId, getDataSource(), getInterval(), numSegmentsKilled, 
numBatchesProcessed
+    );
+
+    final KillTaskReport.Stats stats =
+        new KillTaskReport.Stats(numSegmentsKilled, numBatchesProcessed, 
numSegmentsMarkedAsUnused);
+    toolbox.getTaskReportFileWriter().write(
+        taskId,
+        TaskReport.buildTaskReports(new KillTaskReport(taskId, stats))
     );
 
-    return TaskStatus.success(getId());
+    return TaskStatus.success(taskId);
   }
 
   @JsonIgnore
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index f320df824a..d77f2e6c24 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -19,12 +19,15 @@
 
 package org.apache.druid.indexing.common.task;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexing.common.KillTaskReport;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.overlord.Segments;
-import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.timeline.DataSegment;
 import org.assertj.core.api.Assertions;
@@ -35,13 +38,14 @@ import org.junit.Test;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class KillUnusedSegmentsTaskTest extends IngestionTestBase
 {
   private static final String DATA_SOURCE = "dataSource";
 
-  private TaskRunner taskRunner;
+  private TestTaskRunner taskRunner;
 
   @Before
   public void setup()
@@ -99,8 +103,7 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
         newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
     );
 
-    Assert.assertEquals(2L, task.getNumBatchesProcessed());
-    Assert.assertEquals(1, task.getNumSegmentsKilled());
+    Assert.assertEquals(new KillTaskReport.Stats(1, 2, 0), getReportedStats());
   }
 
 
@@ -148,8 +151,8 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
         newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
         newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
     );
-    Assert.assertEquals(2L, task.getNumBatchesProcessed());
-    Assert.assertEquals(1, task.getNumSegmentsKilled());
+
+    Assert.assertEquals(new KillTaskReport.Stats(1, 2, 1), getReportedStats());
   }
 
   @Test
@@ -209,8 +212,7 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, 
Intervals.of("2019/2020"));
 
     Assert.assertEquals(Collections.emptyList(), unusedSegments);
-    Assert.assertEquals(4L, task.getNumBatchesProcessed());
-    Assert.assertEquals(4, task.getNumSegmentsKilled());
+    Assert.assertEquals(new KillTaskReport.Stats(4, 4, 0), getReportedStats());
   }
 
   @Test
@@ -246,8 +248,8 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, 
Intervals.of("2019/2020"));
 
     Assert.assertEquals(Collections.emptyList(), unusedSegments);
-    Assert.assertEquals(3L, task.getNumBatchesProcessed());
-    Assert.assertEquals(4, task.getNumSegmentsKilled());
+
+    Assert.assertEquals(new KillTaskReport.Stats(4, 3, 4), getReportedStats());
   }
 
   @Test
@@ -362,6 +364,38 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
     Assert.assertEquals(2, (int) task.getNumTotalBatches());
   }
 
+  @Test
+  public void testKillTaskReportSerde() throws Exception
+  {
+    final String taskId = "test_serde_task";
+
+    final KillTaskReport.Stats stats = new KillTaskReport.Stats(1, 2, 3);
+    KillTaskReport report = new KillTaskReport(taskId, stats);
+
+    String json = getObjectMapper().writeValueAsString(report);
+    TaskReport deserializedReport = getObjectMapper().readValue(json, 
TaskReport.class);
+    Assert.assertTrue(deserializedReport instanceof KillTaskReport);
+
+    KillTaskReport deserializedKillReport = (KillTaskReport) 
deserializedReport;
+    Assert.assertEquals(KillTaskReport.REPORT_KEY, 
deserializedKillReport.getReportKey());
+    Assert.assertEquals(taskId, deserializedKillReport.getTaskId());
+    Assert.assertEquals(stats, deserializedKillReport.getPayload());
+  }
+
+  private KillTaskReport.Stats getReportedStats()
+  {
+    try {
+      Object payload = getObjectMapper().readValue(
+          taskRunner.getTaskReportsFile(),
+          new TypeReference<Map<String, TaskReport>>() { }
+      ).get(KillTaskReport.REPORT_KEY).getPayload();
+      return getObjectMapper().convertValue(payload, 
KillTaskReport.Stats.class);
+    }
+    catch (Exception e) {
+      throw new ISE(e, "Error while reading task report");
+    }
+  }
+
   private static DataSegment newSegment(Interval interval, String version)
   {
     return new DataSegment(
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
index 3676d68409..279c6699ff 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
@@ -25,7 +25,6 @@ import com.google.common.base.Preconditions;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
-
 import java.util.Objects;
 
 /**
@@ -90,6 +89,14 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
     return interval;
   }
 
+  /**
+   * This field has been deprecated as "kill" tasks should not be responsible 
for
+   * marking segments as unused. Instead, users should call the Coordinator API
+   * {@code /{dataSourceName}/markUnused} to explicitly mark segments as 
unused.
+   * Segments may also be marked unused by the Coordinator if they become 
overshadowed
+   * or have a {@code DropRule} applied to them.
+   */
+  @Deprecated
   @JsonProperty
   public Boolean getMarkAsUnused()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to