This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d7c152c82c Add a TaskReport for "kill" tasks (#15023)
d7c152c82c is described below
commit d7c152c82c16983109a303cb4ea5eb03861dbedf
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Sep 23 07:44:27 2023 +0530
Add a TaskReport for "kill" tasks (#15023)
- Add `KillTaskReport` that contains stats for `numSegmentsKilled`,
`numBatchesProcessed`, `numSegmentsMarkedAsUnused`
- Fix bug where exception message had no formatter but was was still being
passed some args.
- Add some comments regarding deprecation of `markAsUnused` flag.
---
.../common/IngestionStatsAndErrorsTaskReport.java | 6 +-
.../druid/indexing/common/KillTaskReport.java | 133 +++++++++++++++++++++
.../apache/druid/indexing/common/TaskReport.java | 3 +-
.../common/task/KillUnusedSegmentsTask.java | 89 +++++++-------
.../common/task/KillUnusedSegmentsTaskTest.java | 54 +++++++--
.../ClientKillUnusedSegmentsTaskQuery.java | 9 +-
6 files changed, 237 insertions(+), 57 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
index 9d9d866853..35ae2f6698 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.common;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
@@ -28,11 +29,12 @@ public class IngestionStatsAndErrorsTaskReport implements
TaskReport
public static final String REPORT_KEY = "ingestionStatsAndErrors";
@JsonProperty
- private String taskId;
+ private final String taskId;
@JsonProperty
- private IngestionStatsAndErrorsTaskReportData payload;
+ private final IngestionStatsAndErrorsTaskReportData payload;
+ @JsonCreator
public IngestionStatsAndErrorsTaskReport(
@JsonProperty("taskId") String taskId,
@JsonProperty("payload") IngestionStatsAndErrorsTaskReportData payload
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java
new file mode 100644
index 0000000000..f97f761166
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/KillTaskReport.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+public class KillTaskReport implements TaskReport
+{
+ public static final String REPORT_KEY = "killUnusedSegments";
+
+ private final String taskId;
+ private final Stats stats;
+
+ @JsonCreator
+ public KillTaskReport(
+ @JsonProperty("taskId") String taskId,
+ @JsonProperty("payload") Stats stats
+ )
+ {
+ this.taskId = taskId;
+ this.stats = stats;
+ }
+
+ @Override
+ @JsonProperty
+ public String getTaskId()
+ {
+ return taskId;
+ }
+
+ @Override
+ public String getReportKey()
+ {
+ return REPORT_KEY;
+ }
+
+ @Override
+ @JsonProperty
+ public Object getPayload()
+ {
+ return stats;
+ }
+
+ public static class Stats
+ {
+ private final int numSegmentsKilled;
+ private final int numBatchesProcessed;
+ private final Integer numSegmentsMarkedAsUnused;
+
+ @JsonCreator
+ public Stats(
+ @JsonProperty("numSegmentsKilled") int numSegmentsKilled,
+ @JsonProperty("numBatchesProcessed") int numBatchesProcessed,
+ @JsonProperty("numSegmentsMarkedAsUnused") @Nullable Integer
numSegmentsMarkedAsUnused
+ )
+ {
+ this.numSegmentsKilled = numSegmentsKilled;
+ this.numBatchesProcessed = numBatchesProcessed;
+ this.numSegmentsMarkedAsUnused = numSegmentsMarkedAsUnused;
+ }
+
+ @JsonProperty
+ public int getNumSegmentsKilled()
+ {
+ return numSegmentsKilled;
+ }
+
+ @JsonProperty
+ public int getNumBatchesProcessed()
+ {
+ return numBatchesProcessed;
+ }
+
+ @Nullable
+ @JsonProperty
+ public Integer getNumSegmentsMarkedAsUnused()
+ {
+ return numSegmentsMarkedAsUnused;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Stats that = (Stats) o;
+ return numSegmentsKilled == that.numSegmentsKilled
+ && numBatchesProcessed == that.numBatchesProcessed
+ && Objects.equals(this.numSegmentsMarkedAsUnused,
that.numSegmentsMarkedAsUnused);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(numSegmentsKilled, numBatchesProcessed,
numSegmentsMarkedAsUnused);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Stats{" +
+ "numSegmentsKilled=" + numSegmentsKilled +
+ ", numBatchesProcessed=" + numBatchesProcessed +
+ ", numSegmentsMarkedAsUnused=" + numSegmentsMarkedAsUnused +
+ '}';
+ }
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
index 5e2f5b6c74..2ac260d72f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
@@ -31,7 +31,8 @@ import java.util.Map;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
- @JsonSubTypes.Type(name = "ingestionStatsAndErrors", value =
IngestionStatsAndErrorsTaskReport.class)
+ @JsonSubTypes.Type(name = "ingestionStatsAndErrors", value =
IngestionStatsAndErrorsTaskReport.class),
+ @JsonSubTypes.Type(name = KillTaskReport.REPORT_KEY, value =
KillTaskReport.class)
})
public interface TaskReport
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index 35653aa230..be1d02f429 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -24,12 +24,13 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
@@ -46,7 +47,6 @@ import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -61,6 +61,8 @@ import java.util.stream.Collectors;
* The client representation of this task is {@link
ClientKillUnusedSegmentsTaskQuery}.
* JSON serialization fields of this class must correspond to those of {@link
* ClientKillUnusedSegmentsTaskQuery}, except for "id" and "context" fields.
+ * <p>
+ * The field {@link #isMarkAsUnused()} is now deprecated.
*/
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
@@ -77,18 +79,18 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
*/
private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;
+ @Deprecated
private final boolean markAsUnused;
/**
* Split processing to try and keep each nuke operation relatively short, in
the case that either
* the database or the storage layer is particularly slow.
*/
private final int batchSize;
- @Nullable private final Integer limit;
-
- // counters included primarily for testing
- private int numSegmentsKilled = 0;
- private long numBatchesProcessed = 0;
+ /**
+ * Maximum number of segments that can be killed.
+ */
+ @Nullable private final Integer limit;
@JsonCreator
public KillUnusedSegmentsTask(
@@ -109,22 +111,26 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
);
this.markAsUnused = markAsUnused != null && markAsUnused;
this.batchSize = (batchSize != null) ? batchSize :
DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
- Preconditions.checkArgument(this.batchSize > 0, "batchSize should be
greater than zero");
- if (null != limit && limit <= 0) {
- throw InvalidInput.exception(
- "limit [%d] is invalid. It must be a positive integer.",
- limit
- );
+ if (this.batchSize <= 0) {
+ throw InvalidInput.exception("batchSize[%d] must be a positive
integer.", limit);
}
- if (limit != null && markAsUnused != null && markAsUnused) {
- throw InvalidInput.exception(
- "limit cannot be provided with markAsUnused.",
- limit
- );
+ if (limit != null && limit <= 0) {
+ throw InvalidInput.exception("Limit[%d] must be a positive integer.",
limit);
+ }
+ if (limit != null && Boolean.TRUE.equals(markAsUnused)) {
+ throw InvalidInput.exception("Limit cannot be provided when markAsUnused
is enabled.");
}
this.limit = limit;
}
+ /**
+ * This field has been deprecated as "kill" tasks should not be responsible
for
+ * marking segments as unused. Instead, users should call the Coordinator API
+ * {@code /{dataSourceName}/markUnused} to explicitly mark segments as
unused.
+ * Segments may also be marked unused by the Coordinator if they become
overshadowed
+ * or have a {@code DropRule} applied to them.
+ */
+ @Deprecated
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isMarkAsUnused()
@@ -160,30 +166,22 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
return ImmutableSet.of();
}
- @JsonIgnore
- @VisibleForTesting
- long getNumBatchesProcessed()
- {
- return numBatchesProcessed;
- }
-
- @JsonIgnore
- @VisibleForTesting
- long getNumSegmentsKilled()
- {
- return numSegmentsKilled;
- }
-
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
final NavigableMap<DateTime, List<TaskLock>> taskLockMap =
getTaskLockMap(toolbox.getTaskActionClient());
+ // Track stats for reporting
+ int numSegmentsKilled = 0;
+ int numBatchesProcessed = 0;
+ final int numSegmentsMarkedAsUnused;
if (markAsUnused) {
- int numMarked = toolbox.getTaskActionClient().submit(
+ numSegmentsMarkedAsUnused = toolbox.getTaskActionClient().submit(
new MarkSegmentsAsUnusedAction(getDataSource(), getInterval())
);
- LOG.info("Marked %d segments as unused.", numMarked);
+ LOG.info("Marked [%d] segments as unused.", numSegmentsMarkedAsUnused);
+ } else {
+ numSegmentsMarkedAsUnused = 0;
}
// List unused segments
@@ -194,7 +192,7 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
"Starting kill with batchSize[%d], up to limit[%d] segments will be
deleted%s",
batchSize,
limit,
- numTotalBatches != null ? StringUtils.format(" in([%d] batches]).",
numTotalBatches) : "."
+ numTotalBatches != null ? StringUtils.format(" in [%d] batches.",
numTotalBatches) : "."
);
do {
if (nextBatchSize <= 0) {
@@ -229,16 +227,21 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
nextBatchSize = computeNextBatchSize(numSegmentsKilled);
} while (unusedSegments.size() != 0 && (null == numTotalBatches ||
numBatchesProcessed < numTotalBatches));
- LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s].
Deleted total [%d] unused segments "
- + "in [%d] batches.",
- getId(),
- getDataSource(),
- getInterval(),
- numSegmentsKilled,
- numBatchesProcessed
+ final String taskId = getId();
+ LOG.info(
+ "Finished kill task[%s] for dataSource[%s] and interval[%s]."
+ + " Deleted total [%d] unused segments in [%d] batches.",
+ taskId, getDataSource(), getInterval(), numSegmentsKilled,
numBatchesProcessed
+ );
+
+ final KillTaskReport.Stats stats =
+ new KillTaskReport.Stats(numSegmentsKilled, numBatchesProcessed,
numSegmentsMarkedAsUnused);
+ toolbox.getTaskReportFileWriter().write(
+ taskId,
+ TaskReport.buildTaskReports(new KillTaskReport(taskId, stats))
);
- return TaskStatus.success(getId());
+ return TaskStatus.success(taskId);
}
@JsonIgnore
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index f320df824a..d77f2e6c24 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -19,12 +19,15 @@
package org.apache.druid.indexing.common.task;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexing.common.KillTaskReport;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.overlord.Segments;
-import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.assertj.core.api.Assertions;
@@ -35,13 +38,14 @@ import org.junit.Test;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class KillUnusedSegmentsTaskTest extends IngestionTestBase
{
private static final String DATA_SOURCE = "dataSource";
- private TaskRunner taskRunner;
+ private TestTaskRunner taskRunner;
@Before
public void setup()
@@ -99,8 +103,7 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
- Assert.assertEquals(2L, task.getNumBatchesProcessed());
- Assert.assertEquals(1, task.getNumSegmentsKilled());
+ Assert.assertEquals(new KillTaskReport.Stats(1, 2, 0), getReportedStats());
}
@@ -148,8 +151,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
- Assert.assertEquals(2L, task.getNumBatchesProcessed());
- Assert.assertEquals(1, task.getNumSegmentsKilled());
+
+ Assert.assertEquals(new KillTaskReport.Stats(1, 2, 1), getReportedStats());
}
@Test
@@ -209,8 +212,7 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE,
Intervals.of("2019/2020"));
Assert.assertEquals(Collections.emptyList(), unusedSegments);
- Assert.assertEquals(4L, task.getNumBatchesProcessed());
- Assert.assertEquals(4, task.getNumSegmentsKilled());
+ Assert.assertEquals(new KillTaskReport.Stats(4, 4, 0), getReportedStats());
}
@Test
@@ -246,8 +248,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE,
Intervals.of("2019/2020"));
Assert.assertEquals(Collections.emptyList(), unusedSegments);
- Assert.assertEquals(3L, task.getNumBatchesProcessed());
- Assert.assertEquals(4, task.getNumSegmentsKilled());
+
+ Assert.assertEquals(new KillTaskReport.Stats(4, 3, 4), getReportedStats());
}
@Test
@@ -362,6 +364,38 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
Assert.assertEquals(2, (int) task.getNumTotalBatches());
}
+ @Test
+ public void testKillTaskReportSerde() throws Exception
+ {
+ final String taskId = "test_serde_task";
+
+ final KillTaskReport.Stats stats = new KillTaskReport.Stats(1, 2, 3);
+ KillTaskReport report = new KillTaskReport(taskId, stats);
+
+ String json = getObjectMapper().writeValueAsString(report);
+ TaskReport deserializedReport = getObjectMapper().readValue(json,
TaskReport.class);
+ Assert.assertTrue(deserializedReport instanceof KillTaskReport);
+
+ KillTaskReport deserializedKillReport = (KillTaskReport)
deserializedReport;
+ Assert.assertEquals(KillTaskReport.REPORT_KEY,
deserializedKillReport.getReportKey());
+ Assert.assertEquals(taskId, deserializedKillReport.getTaskId());
+ Assert.assertEquals(stats, deserializedKillReport.getPayload());
+ }
+
+ private KillTaskReport.Stats getReportedStats()
+ {
+ try {
+ Object payload = getObjectMapper().readValue(
+ taskRunner.getTaskReportsFile(),
+ new TypeReference<Map<String, TaskReport>>() { }
+ ).get(KillTaskReport.REPORT_KEY).getPayload();
+ return getObjectMapper().convertValue(payload,
KillTaskReport.Stats.class);
+ }
+ catch (Exception e) {
+ throw new ISE(e, "Error while reading task report");
+ }
+ }
+
private static DataSegment newSegment(Interval interval, String version)
{
return new DataSegment(
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
index 3676d68409..279c6699ff 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
@@ -25,7 +25,6 @@ import com.google.common.base.Preconditions;
import org.joda.time.Interval;
import javax.annotation.Nullable;
-
import java.util.Objects;
/**
@@ -90,6 +89,14 @@ public class ClientKillUnusedSegmentsTaskQuery implements
ClientTaskQuery
return interval;
}
+ /**
+ * This field has been deprecated as "kill" tasks should not be responsible
for
+ * marking segments as unused. Instead, users should call the Coordinator API
+ * {@code /{dataSourceName}/markUnused} to explicitly mark segments as
unused.
+ * Segments may also be marked unused by the Coordinator if they become
overshadowed
+ * or have a {@code DropRule} applied to them.
+ */
+ @Deprecated
@JsonProperty
public Boolean getMarkAsUnused()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]