This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a041933 Allow overlapping intervals for the compaction task (#10912)
a041933 is described below
commit a04193301732b08977b2085f2f858751ab614141
Author: Jihoon Son <[email protected]>
AuthorDate: Tue Mar 23 11:21:54 2021 -0700
Allow overlapping intervals for the compaction task (#10912)
* Allow overlapping intervals for the compaction task
* unused import
* line indentation
Co-authored-by: Maytas Monsereenusorn <[email protected]>
---
.../common/granularity/IntervalsByGranularity.java | 25 +-------
.../util/common/IntervalsByGranularityTest.java | 60 ++++++++++--------
.../druid/indexing/common/task/CompactionTask.java | 6 +-
.../common/task/CompactionTaskRunTest.java | 74 ++++++++++++++++++++++
.../indexing/granularity/BaseGranularitySpec.java | 9 +--
5 files changed, 117 insertions(+), 57 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
index 7065535..ff076d4 100644
---
a/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
+++
b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
@@ -23,16 +23,12 @@ import com.google.common.collect.FluentIterable;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
-import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.Interval;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
/**
* Produce a stream of intervals generated by a given set of intervals as
input and a given
@@ -51,19 +47,7 @@ public class IntervalsByGranularity
*/
public IntervalsByGranularity(Collection<Interval> intervals, Granularity
granularity)
{
- // eliminate dups, sort intervals:
- Set<Interval> intervalSet = new HashSet<>(intervals);
- List<Interval> inputIntervals = new ArrayList<>(intervals.size());
- inputIntervals.addAll(intervalSet);
- inputIntervals.sort(Comparators.intervalsByStartThenEnd());
-
- // sanity check
- if (JodaUtils.containOverlappingIntervals(inputIntervals)) {
- throw new IAE("Intervals contain overlapping intervals [%s]", intervals);
- }
-
- // all good:
- sortedNonOverlappingIntervals = inputIntervals;
+ this.sortedNonOverlappingIntervals =
JodaUtils.condenseIntervals(intervals);
this.granularity = granularity;
}
@@ -73,9 +57,8 @@ public class IntervalsByGranularity
*/
public Iterator<Interval> granularityIntervalsIterator()
{
- Iterator<Interval> ite;
if (sortedNonOverlappingIntervals.isEmpty()) {
- ite = Collections.emptyIterator();
+ return Collections.emptyIterator();
} else {
// The filter after transform & concat is to remove duplicats.
// This can happen when condense left intervals that did not overlap but
@@ -85,7 +68,7 @@ public class IntervalsByGranularity
// intervals will be returned, both with the same value
2013-01-01T00:00:00.000Z/2013-02-01T00:00:00.000Z.
// Thus dups can be created given the right conditions....
final SettableSupplier<Interval> previous = new SettableSupplier<>();
- ite =
FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable)
+ return
FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable)
.filter(interval -> {
if (previous.get() != null &&
previous.get().equals(interval)) {
return false;
@@ -94,7 +77,5 @@ public class IntervalsByGranularity
return true;
}).iterator();
}
- return ite;
}
-
}
diff --git
a/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
index a38e6d5..ee01aa0 100644
---
a/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
+++
b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
@@ -21,11 +21,13 @@ package org.apache.druid.java.util.common;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.joda.time.Interval;
import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.util.Collections;
import java.util.Iterator;
@@ -34,8 +36,8 @@ import java.util.NoSuchElementException;
public class IntervalsByGranularityTest
{
- private static final long SECONDS_IN_YEAR = 31536000;
-
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
@Test
public void testTrivialIntervalExplosion()
@@ -46,17 +48,17 @@ public class IntervalsByGranularityTest
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
- Granularity.fromString("DAY")
+ Granularities.DAY
);
// get count:
Iterator<Interval> granularityIntervals =
intervals.granularityIntervalsIterator();
- long count = getCount(granularityIntervals);
- Assert.assertTrue(count == 62 + 365);
+ long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+ Assert.assertEquals(62 + 365, count);
granularityIntervals = intervals.granularityIntervalsIterator();
count = getCountWithNoHasNext(granularityIntervals);
- Assert.assertTrue(count == 62 + 365);
+ Assert.assertEquals(62 + 365, count);
}
@@ -69,13 +71,13 @@ public class IntervalsByGranularityTest
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
- Granularity.fromString("DAY")
+ Granularities.DAY
);
// get count:
Iterator<Interval> granularityIntervals =
intervals.granularityIntervalsIterator();
- long count = getCount(granularityIntervals);
- Assert.assertTrue(count == 61);
+ long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+ Assert.assertEquals(61, count);
}
@@ -88,7 +90,7 @@ public class IntervalsByGranularityTest
Interval first = Intervals.of("2012-01-01T00Z/P1Y");
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first),
- Granularity.fromString("SECOND")
+ Granularities.SECOND
);
Assert.assertEquals(
ImmutableList.of(Intervals.of("2012-01-01T00Z/2013-01-01T00Z")),
@@ -96,22 +98,29 @@ public class IntervalsByGranularityTest
);
}
+ /**
+ * This test iterates huge intervals (2.5 years) with the SECOND granularity.
+ * The motivation behind this test is ensuring that IntervalsByGranularity
can handle
+ * these huge intervals with a tiny granularity. However, this test takes a
long time
+ * to populate all intervals based on the SECOND granularity (more than 1
min), so
+ * is ignored by default. We should make this test not a unit test, but a
load test.
+ */
+ @Ignore
@Test
- public void testIntervalExplosion()
+ public void testIterateHugeIntervalsWithTinyGranularity()
{
Interval first = Intervals.of("2012-01-01T00Z/2012-12-31T00Z");
Interval second = Intervals.of("2002-01-01T00Z/2002-12-31T00Z");
Interval third = Intervals.of("2021-01-01T00Z/2021-06-30T00Z");
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
- Granularity.fromString("SECOND")
+ Granularities.SECOND
);
// get count:
Iterator<Interval> granularityIntervals =
intervals.granularityIntervalsIterator();
- long count = getCount(granularityIntervals);
- Assert.assertTrue(count == 78537600);
-
+ long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+ Assert.assertEquals(78537600, count);
}
@Test
@@ -132,7 +141,6 @@ public class IntervalsByGranularityTest
ImmutableList.of(Intervals.of("2012-01-01T00Z/2012-02-01T00Z")),
ImmutableList.copyOf(intervals.granularityIntervalsIterator())
);
-
}
@Test
@@ -160,10 +168,9 @@ public class IntervalsByGranularityTest
),
ImmutableList.copyOf(intervals.granularityIntervalsIterator())
);
-
}
- @Test(expected = IAE.class)
+ @Test
public void testOverlappingShouldThrow()
{
List<Interval> inputIntervals = ImmutableList.of(
@@ -174,10 +181,13 @@ public class IntervalsByGranularityTest
IntervalsByGranularity intervals = new IntervalsByGranularity(
inputIntervals,
- Granularity.fromString("DAY")
+ Granularities.DAY
);
- }
+ Iterator<Interval> granularityIntervals =
intervals.granularityIntervalsIterator();
+ long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+ Assert.assertEquals(14, count);
+ }
@Test
public void testWithGranularity()
@@ -190,13 +200,13 @@ public class IntervalsByGranularityTest
IntervalsByGranularity intervals = new IntervalsByGranularity(
inputIntervals,
- Granularity.fromString("MONTH")
+ Granularities.MONTH
);
// get count:
Iterator<Interval> granularityIntervals =
intervals.granularityIntervalsIterator();
- long count = getCount(granularityIntervals);
- Assert.assertTrue(count == 2);
+ long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+ Assert.assertEquals(2, count);
}
@Test(expected = UnsupportedOperationException.class)
@@ -223,7 +233,7 @@ public class IntervalsByGranularityTest
Assert.assertFalse(intervals.granularityIntervalsIterator().hasNext());
}
- private long getCount(Iterator<Interval> granularityIntervalIterator)
+ private long verifyIteratorAndReturnIntervalCount(Iterator<Interval>
granularityIntervalIterator)
{
long count = 0;
Interval previous = null;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index bdf250a..8234c35 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -22,6 +22,8 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -142,8 +144,6 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable
private final AggregatorFactory[] metricsSpec;
@Nullable
- private final Granularity segmentGranularity;
- @Nullable
private final ClientCompactionTaskGranularitySpec granularitySpec;
@Nullable
private final ParallelIndexTuningConfig tuningConfig;
@@ -207,7 +207,6 @@ public class CompactionTask extends AbstractBatchIndexTask
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.metricsSpec = metricsSpec;
- this.segmentGranularity = segmentGranularity;
// Prior to apache/druid#10843 users could specify segmentGranularity
using `segmentGranularity`
// Now users should prefer to use `granularitySpec`
// In case users accidentally specify both, and they are conflicting, warn
the user instead of proceeding
@@ -308,6 +307,7 @@ public class CompactionTask extends AbstractBatchIndexTask
return metricsSpec;
}
+ @JsonInclude(Include.NON_NULL)
@JsonProperty
@Nullable
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 95f0aaf..f3ee50c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -56,6 +56,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@@ -799,6 +800,79 @@ public class CompactionTaskRunTest extends
IngestionTestBase
}
@Test
+ public void
testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompact()
throws Exception
+ {
+ // This test fails with segment lock because of the bug reported in
https://github.com/apache/druid/issues/10911.
+ if (lockGranularity == LockGranularity.SEGMENT) {
+ return;
+ }
+
+ runIndexTask();
+
+ final Set<DataSegment> expectedSegments = new HashSet<>(
+ getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+ DATA_SOURCE,
+ Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+ Segments.ONLY_VISIBLE
+ )
+ );
+
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ segmentLoaderFactory,
+ RETRY_POLICY_FACTORY
+ );
+
+ final Interval partialInterval =
Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00");
+ final CompactionTask partialCompactionTask = builder
+ .interval(partialInterval)
+ .segmentGranularity(Granularities.MINUTE)
+ .build();
+
+ final Pair<TaskStatus, List<DataSegment>> partialCompactionResult =
runTask(partialCompactionTask);
+ Assert.assertTrue(partialCompactionResult.lhs.isSuccess());
+ // All segments in the previous expectedSegments should still appear as
they have larger segment granularity.
+ expectedSegments.addAll(partialCompactionResult.rhs);
+
+ final Set<DataSegment> segmentsAfterPartialCompaction = new HashSet<>(
+ getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+ DATA_SOURCE,
+ Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+ Segments.ONLY_VISIBLE
+ )
+ );
+
+ Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);
+
+ final CompactionTask fullCompactionTask = builder
+ .interval(Intervals.of("2014-01-01/2014-01-02"))
+ .segmentGranularity(null)
+ .build();
+
+ final Pair<TaskStatus, List<DataSegment>> fullCompactionResult =
runTask(fullCompactionTask);
+ Assert.assertTrue(fullCompactionResult.lhs.isSuccess());
+
+ final List<DataSegment> segmentsAfterFullCompaction = new ArrayList<>(
+ getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+ DATA_SOURCE,
+ Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+ Segments.ONLY_VISIBLE
+ )
+ );
+ segmentsAfterFullCompaction.sort(
+ (s1, s2) ->
Comparators.intervalsByStartThenEnd().compare(s1.getInterval(),
s2.getInterval())
+ );
+
+ Assert.assertEquals(3, segmentsAfterFullCompaction.size());
+ for (int i = 0; i < segmentsAfterFullCompaction.size(); i++) {
+ Assert.assertEquals(
+ Intervals.of(StringUtils.format("2014-01-01T%02d/2014-01-01T%02d",
i, i + 1)),
+ segmentsAfterFullCompaction.get(i).getInterval()
+ );
+ }
+ }
+
+ @Test
public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws
Exception
{
runIndexTask();
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
index 779952b..ac9fdca 100644
---
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
+++
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -44,16 +43,12 @@ public abstract class BaseGranularitySpec implements
GranularitySpec
public static final Granularity DEFAULT_SEGMENT_GRANULARITY =
Granularities.DAY;
public static final Granularity DEFAULT_QUERY_GRANULARITY =
Granularities.NONE;
- protected List<Interval> inputIntervals;
+ protected final List<Interval> inputIntervals;
protected final Boolean rollup;
public BaseGranularitySpec(List<Interval> inputIntervals, Boolean rollup)
{
- if (inputIntervals != null) {
- this.inputIntervals = ImmutableList.copyOf(inputIntervals);
- } else {
- this.inputIntervals = Collections.emptyList();
- }
+ this.inputIntervals = inputIntervals == null ? Collections.emptyList() :
inputIntervals;
this.rollup = rollup == null ? DEFAULT_ROLLUP : rollup;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]