This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new fb653ce Add benchmark for VersionedIntervalTimeline (#8161)
fb653ce is described below
commit fb653ceef9fc778b93562d4708af3b5cb87eef02
Author: Jihoon Son <[email protected]>
AuthorDate: Tue Jul 30 08:10:00 2019 -0700
Add benchmark for VersionedIntervalTimeline (#8161)
* Add benchmark for VersionedIntervalTimeline
* rename
---
.../VersionedIntervalTimelineBenchmark.java | 270 +++++++++++++++++++++
.../druid/indexing/common/task/KillTaskTest.java | 2 +-
2 files changed, 271 insertions(+), 1 deletion(-)
diff --git
a/benchmarks/src/main/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java
b/benchmarks/src/main/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java
new file mode 100644
index 0000000..08a5674
--- /dev/null
+++
b/benchmarks/src/main/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.GranularityType;
+import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.PartitionIds;
+import org.apache.druid.timeline.partition.ShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+@State(Scope.Benchmark)
+@Fork(value = 1, jvmArgsAppend = {"-XX:+UseG1GC"})
+@Warmup(iterations = 10)
+@Measurement(iterations = 10)
+@BenchmarkMode({Mode.Throughput})
+public class VersionedIntervalTimelineBenchmark
+{
+ private static final String DATA_SOURCE = "dataSource";
+ private static final Interval TOTAL_INTERVAL = Intervals.of("2018/2019");
+ private static final double NEW_ROOT_GEN_SEGMENTS_RATIO_AFTER_COMPACTION =
0.1;
+ private static final double COMPACTED_SEGMENTS_RATIO_TO_INITIAL_SEGMENTS =
0.5;
+
+ @Param({"10", "100", "1000"})
+ private int numInitialRootGenSegmentsPerInterval;
+
+ @Param({"1", "5"})
+ private int numNonRootGenerations;
+
+ @Param({"false", "true"})
+ private boolean useSegmentLock;
+
+ @Param({"MONTH", "DAY"})
+ private GranularityType segmentGranularity;
+
+ private List<Interval> intervals;
+ private List<DataSegment> segments;
+ private VersionedIntervalTimeline<String, DataSegment> timeline;
+ private List<DataSegment> newSegments;
+
+ @Setup
+ public void setup()
+ {
+ final int numNewRootGenSegmentsAfterCompaction =
+ (int) (numInitialRootGenSegmentsPerInterval *
NEW_ROOT_GEN_SEGMENTS_RATIO_AFTER_COMPACTION);
+ final int numCompactedSegments =
+ (int) (numInitialRootGenSegmentsPerInterval *
COMPACTED_SEGMENTS_RATIO_TO_INITIAL_SEGMENTS);
+
+ intervals =
Lists.newArrayList(segmentGranularity.getDefaultGranularity().getIterable(TOTAL_INTERVAL));
+ segments = new ArrayList<>(intervals.size() *
numInitialRootGenSegmentsPerInterval);
+ Map<Interval, Integer> nextRootGenPartitionIds = new
HashMap<>(intervals.size());
+ Map<Interval, Integer> nextNonRootGenPartitionIds = new
HashMap<>(intervals.size());
+ Map<Interval, Short> nextMinorVersions = new HashMap<>(intervals.size());
+
+ DateTime majorVersion = DateTimes.nowUtc();
+
+ for (Interval interval : intervals) {
+ majorVersion = majorVersion.plus(1);
+ int nextRootGenPartitionId = 0;
+ int nextNonRootGenPartitionId =
PartitionIds.NON_ROOT_GEN_START_PARTITION_ID;
+
+ // Generate root generation segments
+ for (int i = 0; i < numInitialRootGenSegmentsPerInterval; i++) {
+ segments.add(newSegment(interval, majorVersion.toString(), new
NumberedShardSpec(nextRootGenPartitionId++, 0)));
+ }
+
+ for (int i = 0; i < numNonRootGenerations; i++) {
+ if (!useSegmentLock) {
+ majorVersion = majorVersion.plus(1);
+ nextRootGenPartitionId = 0;
+ }
+ // Compacted segments
+ for (int j = 0; j < numCompactedSegments; j++) {
+ if (useSegmentLock) {
+ segments.add(
+ newSegment(
+ interval,
+ majorVersion.toString(),
+ new NumberedOverwriteShardSpec(
+ nextNonRootGenPartitionId++,
+ 0,
+ nextRootGenPartitionId,
+ (short) (i + 1),
+ (short) numCompactedSegments
+ )
+ )
+ );
+ } else {
+ segments.add(newSegment(interval, majorVersion.toString(), new
NumberedShardSpec(nextRootGenPartitionId++, 0)));
+ }
+ }
+
+ // New segments
+ for (int j = 0; j < numNewRootGenSegmentsAfterCompaction; j++) {
+ segments.add(newSegment(interval, majorVersion.toString(), new
NumberedShardSpec(nextRootGenPartitionId++, 0)));
+ }
+ }
+ nextRootGenPartitionIds.put(interval, nextRootGenPartitionId);
+ nextNonRootGenPartitionIds.put(interval, nextNonRootGenPartitionId);
+ nextMinorVersions.put(interval, (short) (numNonRootGenerations + 1));
+ }
+
+ timeline = VersionedIntervalTimeline.forSegments(segments);
+
+ newSegments = new ArrayList<>(200);
+
+ // Generate new appending segments
+ for (int i = 0; i < 100; i++) {
+ final Interval interval =
intervals.get(ThreadLocalRandom.current().nextInt(intervals.size()));
+ final int rootPartitionId = nextRootGenPartitionIds.get(interval);
+
+ newSegments.add(
+ newSegment(
+ interval,
+ majorVersion.toString(),
+ new NumberedShardSpec(rootPartitionId, 0)
+ )
+ );
+ nextRootGenPartitionIds.put(interval, rootPartitionId + 1);
+ }
+
+ // Generate overwriting segments
+ if (!useSegmentLock) {
+ majorVersion = majorVersion.plus(1);
+ nextRootGenPartitionIds.keySet().forEach(interval ->
nextRootGenPartitionIds.put(interval, 0));
+ }
+
+ final List<Interval> intervalCopies = new ArrayList<>(intervals);
+ for (int i = 0; i < 100 && !intervalCopies.isEmpty(); i++) {
+ final Interval interval =
intervalCopies.remove(ThreadLocalRandom.current().nextInt(intervalCopies.size()));
+ int rootPartitionId = nextRootGenPartitionIds.remove(interval);
+ int nonRootPartitionId = nextNonRootGenPartitionIds.remove(interval);
+ final short minorVersion = nextMinorVersions.remove(interval);
+
+ for (int j = 0; j < numCompactedSegments; j++) {
+ if (useSegmentLock) {
+ newSegments.add(
+ newSegment(
+ interval,
+ majorVersion.toString(),
+ new NumberedOverwriteShardSpec(
+ nonRootPartitionId++,
+ 0,
+ rootPartitionId,
+ minorVersion,
+ (short) numCompactedSegments
+ )
+ )
+ );
+ } else {
+ newSegments.add(
+ newSegment(
+ interval,
+ majorVersion.toString(),
+ new NumberedShardSpec(rootPartitionId++, 0)
+ )
+ );
+ }
+ }
+ }
+ }
+
+ @Benchmark
+ public void benchAdd(Blackhole blackhole)
+ {
+ final VersionedIntervalTimeline<String, DataSegment> timeline =
VersionedIntervalTimeline.forSegments(segments);
+ for (DataSegment newSegment : newSegments) {
+ timeline.add(
+ newSegment.getInterval(),
+ newSegment.getVersion(),
+ newSegment.getShardSpec().createChunk(newSegment)
+ );
+ }
+ }
+
+ @Benchmark
+ public void benchRemove(Blackhole blackhole)
+ {
+ final List<DataSegment> segmentsCopy = new ArrayList<>(segments);
+ final VersionedIntervalTimeline<String, DataSegment> timeline =
VersionedIntervalTimeline.forSegments(segmentsCopy);
+ final int numTests = (int) (segmentsCopy.size() * 0.1);
+ for (int i = 0; i < numTests; i++) {
+ final DataSegment segment =
segmentsCopy.remove(ThreadLocalRandom.current().nextInt(segmentsCopy.size()));
+ blackhole.consume(
+ timeline.remove(
+ segment.getInterval(),
+ segment.getVersion(),
+ segment.getShardSpec().createChunk(segment)
+ )
+ );
+ }
+ }
+
+ @Benchmark
+ public void benchLookup(Blackhole blackhole)
+ {
+ final int intervalIndex =
ThreadLocalRandom.current().nextInt(intervals.size() - 2);
+ final Interval queryInterval = new Interval(
+ intervals.get(intervalIndex).getStart(),
+ intervals.get(intervalIndex + 2).getEnd()
+ );
+ blackhole.consume(timeline.lookup(queryInterval));
+ }
+
+ @Benchmark
+ public void benchIsOvershadowed(Blackhole blackhole)
+ {
+ final DataSegment segment =
segments.get(ThreadLocalRandom.current().nextInt(segments.size()));
+ blackhole.consume(timeline.isOvershadowed(segment.getInterval(),
segment.getVersion(), segment));
+ }
+
+ @Benchmark
+ public void benchFindFullyOvershadowed(Blackhole blackhole)
+ {
+ blackhole.consume(timeline.findFullyOvershadowed());
+ }
+
+ private static DataSegment newSegment(Interval interval, String version,
ShardSpec shardSpec)
+ {
+ return new DataSegment(
+ DATA_SOURCE,
+ interval,
+ version,
+ null,
+ null,
+ null,
+ shardSpec,
+ 9,
+ 10
+ );
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillTaskTest.java
index cc4334f..322c649 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillTaskTest.java
@@ -59,7 +59,7 @@ public class KillTaskTest extends IngestionTestBase
final Set<DataSegment> announced =
getMetadataStorageCoordinator().announceHistoricalSegments(segments);
Assert.assertEquals(segments, announced);
-
+
Assert.assertTrue(
getMetadataSegmentManager().markSegmentAsUnused(
newSegment(Intervals.of("2019-02-01/2019-03-01"),
version).getId().toString()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]