This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9056d11 Add docs and integration tests for Auto-compaction snapshot
status API (#10510)
9056d11 is described below
commit 9056d113d0a605671e7ee3fb2d4341671e600997
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Wed Oct 14 06:42:22 2020 -0700
Add docs and integration tests for Auto-compaction snapshot status API
(#10510)
* add docs and IT for Auto-compaction snapshot status API
* fix spellings
* fix test
* address comments
---
docs/operations/api-reference.md | 26 ++++++
docs/operations/metrics.md | 14 +++-
.../clients/CompactionResourceTestClient.java | 18 +++++
.../coordinator/duty/ITAutoCompactionTest.java | 92 +++++++++++++++++++++-
4 files changed, 147 insertions(+), 3 deletions(-)
diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md
index a3610a8..5c6351f 100644
--- a/docs/operations/api-reference.md
+++ b/docs/operations/api-reference.md
@@ -410,6 +410,32 @@ Returns total size and count for each datasource for each
interval within given
Returns the total size of segments awaiting compaction for the given
dataSource.
This is only valid for dataSource which has compaction enabled.
+##### GET
+
+* `/druid/coordinator/v1/compaction/status`
+
+Returns the status and statistics from the latest auto compaction run of all
dataSources which have/had auto compaction enabled.
+The response payload includes a list of `latestStatus` objects. Each
`latestStatus` represents the status for a dataSource (which has/had auto
compaction enabled).
+The `latestStatus` object has the following keys:
+* `dataSource`: name of the datasource for this status information
+* `scheduleStatus`: auto compaction scheduling status. Possible values are
`NOT_ENABLED` and `RUNNING`. Returns `RUNNING ` if the dataSource has an active
auto compaction config submitted otherwise, `NOT_ENABLED`
+* `bytesAwaitingCompaction`: total bytes of this datasource waiting to be
compacted by the auto compaction (only consider intervals/segments that are
eligible for auto compaction)
+* `bytesCompacted`: total bytes of this datasource that are already compacted
with the spec set in the auto compaction config.
+* `bytesSkipped`: total bytes of this datasource that are skipped (not
eligible for auto compaction) by the auto compaction.
+* `segmentCountAwaitingCompaction`: total number of segments of this
datasource waiting to be compacted by the auto compaction (only consider
intervals/segments that are eligible for auto compaction)
+* `segmentCountCompacted`: total number of segments of this datasource that
are already compacted with the spec set in the auto compaction config.
+* `segmentCountSkipped`: total number of segments of this datasource that are
skipped (not eligible for auto compaction) by the auto compaction.
+* `intervalCountAwaitingCompaction`: total number of intervals of this
datasource waiting to be compacted by the auto compaction (only consider
intervals/segments that are eligible for auto compaction)
+* `intervalCountCompacted`: total number of intervals of this datasource that
are already compacted with the spec set in the auto compaction config.
+* `intervalCountSkipped`: total number of intervals of this datasource that
are skipped (not eligible for auto compaction) by the auto compaction.
+
+##### GET
+
+* `/druid/coordinator/v1/compaction/status?dataSource={dataSource}`
+
+Similar to the API `/druid/coordinator/v1/compaction/status` above but filters
response to only return information for the {dataSource} given.
+Note that {dataSource} given must have/had auto compaction enabled.
+
#### Compaction Configuration
##### GET
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 95bc345..933f131 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -232,13 +232,25 @@ These metrics are for the Druid Coordinator and are reset
each time the Coordina
|`segment/dropQueue/count`|Number of segments to drop.|server.|Varies.|
|`segment/size`|Total size of used segments in a data source. Emitted only for
data sources to which at least one used segment belongs.|dataSource.|Varies.|
|`segment/count`|Number of used segments belonging to a data source. Emitted
only for data sources to which at least one used segment belongs.|dataSource.|<
max|
-|`segment/overShadowed/count`|Number of overshadowed segments.||Varies.|
+|`segment/overShadowed/count`|Number of overshadowed segments.| |Varies.|
|`segment/unavailable/count`|Number of segments (not including replicas) left
to load until segments that should be loaded in the cluster are available for
queries.|dataSource.|0|
|`segment/underReplicated/count`|Number of segments (including replicas) left
to load until segments that should be loaded in the cluster are available for
queries.|tier, dataSource.|0|
|`tier/historical/count`|Number of available historical nodes in each
tier.|tier.|Varies.|
|`tier/replication/factor`|Configured maximum replication factor in each
tier.|tier.|Varies.|
|`tier/required/capacity`|Total capacity in bytes required in each
tier.|tier.|Varies.|
|`tier/total/capacity`|Total capacity in bytes available in each
tier.|tier.|Varies.|
+|`compact/task/count`|Number of tasks issued in the auto compaction run.|
|Varies.|
+|`compactTask/maxSlot/count`|Max number of task slots that can be used for
auto compaction tasks in the auto compaction run.| |Varies.|
+|`compactTask/availableSlot/count`|Number of available task slots that can be
used for auto compaction tasks in the auto compaction run. (this is max slot
minus any currently running compaction task)| |Varies.|
+|`segment/waitCompact/bytes`|Total bytes of this datasource waiting to be
compacted by the auto compaction (only consider intervals/segments that are
eligible for auto compaction).|datasource.|Varies.|
+|`segment/waitCompact/count`|Total number of segments of this datasource
waiting to be compacted by the auto compaction (only consider
intervals/segments that are eligible for auto compaction).|datasource.|Varies.|
+|`interval/waitCompact/count`|Total number of intervals of this datasource
waiting to be compacted by the auto compaction (only consider
intervals/segments that are eligible for auto compaction).|datasource.|Varies.|
+|`segment/compacted/bytes`|Total bytes of this datasource that are already
compacted with the spec set in the auto compaction config.|datasource.|Varies.|
+|`segment/compacted/count`|Total number of segments of this datasource that
are already compacted with the spec set in the auto compaction
config.|datasource.|Varies.|
+|`interval/compacted/count`|Total number of intervals of this datasource that
are already compacted with the spec set in the auto compaction
config.|datasource.|Varies.|
+|`segment/skipCompact/bytes`|Total bytes of this datasource that are skipped
(not eligible for auto compaction) by the auto compaction.|datasource.|Varies.|
+|`segment/skipCompact/count`|Total number of segments of this datasource that
are skipped (not eligible for auto compaction) by the auto
compaction.|datasource.|Varies.|
+|`interval/skipCompact/count`|Total number of intervals of this datasource
that are skipped (not eligible for auto compaction) by the auto
compaction.|datasource.|Varies.|
If `emitBalancingStats` is set to `true` in the Coordinator [dynamic
configuration](
../configuration/index.html#dynamic-configuration), then [log
entries](../configuration/logging.md) for class
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
index 60f3930..c1e9ae4 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
@@ -36,6 +36,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.net.URL;
+import java.util.List;
import java.util.Map;
public class CompactionResourceTestClient
@@ -175,4 +176,21 @@ public class CompactionResourceTestClient
}
return jsonMapper.readValue(response.getContent(), new
TypeReference<Map<String, String>>() {});
}
+
+ public Map<String, String> getCompactionStatus(String dataSource) throws
Exception
+ {
+ String url = StringUtils.format("%scompaction/status?dataSource=%s",
getCoordinatorURL(), StringUtils.urlEncode(dataSource));
+ StatusResponseHolder response = httpClient.go(
+ new Request(HttpMethod.GET, new URL(url)), responseHandler
+ ).get();
+ if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+ throw new ISE(
+ "Error while getting compaction status status[%s] content[%s]",
+ response.getStatus(),
+ response.getContent()
+ );
+ }
+ Map<String, List<Map<String, String>>> latestSnapshots =
jsonMapper.readValue(response.getContent(), new TypeReference<Map<String,
List<Map<String, String>>>>() {});
+ return latestSnapshots.get("latestStatus").get(0);
+ }
}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index f643086..7138f24 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -29,6 +29,7 @@ import
org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
@@ -52,6 +53,7 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
@Test(groups = {TestNGGroup.COMPACTION})
@@ -97,13 +99,36 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
-
+ getAndAssertCompactionStatus(
+ fullDatasourceName,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ 0,
+ 14312,
+ 0,
+ 0,
+ 2,
+ 0,
+ 0,
+ 1,
+ 0);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
//...compacted into 1 new segment for the remaining one day. 2 day
compacted and 0 day uncompacted. (2 total)
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
+ getAndAssertCompactionStatus(
+ fullDatasourceName,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ 0,
+ 22489,
+ 0,
+ 0,
+ 3,
+ 0,
+ 0,
+ 2,
+ 0);
}
}
@@ -200,7 +225,18 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
checkCompactionIntervals(intervalsBeforeCompaction);
-
+ getAndAssertCompactionStatus(
+ fullDatasourceName,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0);
// Update compaction slots to be 1
updateCompactionTaskSlot(1, 1);
// One day compacted (1 new segment) and one day remains uncompacted. (3
total)
@@ -208,6 +244,18 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
+ getAndAssertCompactionStatus(
+ fullDatasourceName,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ 14312,
+ 14311,
+ 0,
+ 2,
+ 2,
+ 0,
+ 1,
+ 1,
+ 0);
Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"),
"14312");
// Run compaction again to compact the remaining day
// Remaining day compacted (1 new segment). Now both days compacted (2
total)
@@ -215,6 +263,18 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
+ getAndAssertCompactionStatus(
+ fullDatasourceName,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
+ 0,
+ 22489,
+ 0,
+ 0,
+ 3,
+ 0,
+ 0,
+ 2,
+ 0);
}
}
@@ -398,4 +458,32 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
Assert.assertEquals(coordinatorCompactionConfig.getCompactionTaskSlotRatio(),
compactionTaskSlotRatio);
Assert.assertEquals(coordinatorCompactionConfig.getMaxCompactionTaskSlots(),
maxCompactionTaskSlots);
}
+
+ private void getAndAssertCompactionStatus(
+ String fullDatasourceName,
+ AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
+ long bytesAwaitingCompaction,
+ long bytesCompacted,
+ long bytesSkipped,
+ long segmentCountAwaitingCompaction,
+ long segmentCountCompacted,
+ long segmentCountSkipped,
+ long intervalCountAwaitingCompaction,
+ long intervalCountCompacted,
+ long intervalCountSkipped
+ ) throws Exception
+ {
+ Map<String, String> actualStatus =
compactionResource.getCompactionStatus(fullDatasourceName);
+ Assert.assertNotNull(actualStatus);
+ Assert.assertEquals(actualStatus.get("scheduleStatus"),
scheduleStatus.toString());
+
Assert.assertEquals(Long.parseLong(actualStatus.get("bytesAwaitingCompaction")),
bytesAwaitingCompaction);
+ Assert.assertEquals(Long.parseLong(actualStatus.get("bytesCompacted")),
bytesCompacted);
+ Assert.assertEquals(Long.parseLong(actualStatus.get("bytesSkipped")),
bytesSkipped);
+
Assert.assertEquals(Long.parseLong(actualStatus.get("segmentCountAwaitingCompaction")),
segmentCountAwaitingCompaction);
+
Assert.assertEquals(Long.parseLong(actualStatus.get("segmentCountCompacted")),
segmentCountCompacted);
+
Assert.assertEquals(Long.parseLong(actualStatus.get("segmentCountSkipped")),
segmentCountSkipped);
+
Assert.assertEquals(Long.parseLong(actualStatus.get("intervalCountAwaitingCompaction")),
intervalCountAwaitingCompaction);
+
Assert.assertEquals(Long.parseLong(actualStatus.get("intervalCountCompacted")),
intervalCountCompacted);
+
Assert.assertEquals(Long.parseLong(actualStatus.get("intervalCountSkipped")),
intervalCountSkipped);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]