This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0e4750b Granularity interval materialization (#10742)
0e4750b is described below
commit 0e4750bac208d99f4a07545cc2f401f9bcdc1381
Author: Agustin Gonzalez <[email protected]>
AuthorDate: Fri Jan 29 07:02:10 2021 -0700
Granularity interval materialization (#10742)
* Prevent interval materialization for UniformGranularitySpec inside the
overlord
* Change API of bucketIntervals in GranularitySpec to return an
Iterable<Interval>
* Javadoc update, respect inputIntervals contract
* Eliminate dependency on wrappedspec (i.e. ArbitraryGranularity) in
UniformGranularitySpec
* Added one boundary condition test to UniformGranularityTest and fixed
Travis forbidden method errors in IntervalsByGranularity
* Fix Travis style & other checks
* Refactor TreeSet to facilitate re-use in UniformGranularitySpec
* Make sure intervals are unique when there is no segment granularity
* Style/bugspot fixes...
* More travis checks
* Add condensedIntervals method to GranularitySpec and pass it as needed to
the lock method
* Style & PR feedback
* Fixed failing test
* Fixed bug in IntervalsByGranularity iterator that it would return
repeated elements (see added unit tests that were broken before this change)
* Refactor so that we can get the condensed buckets without materializing
the intervals
* Get rid of GranularitySpec::condensedInputIntervals ... not needed
* Travis failures fixes
* Travis checkstyle fix
* Edited/added javadoc comments and a method name (code review feedback)
* Fixed jacoco coverage by moving class and adding more coverage
* Avoid materializing the condensed intervals when locking
* Deal with overlapping intervals
* Remove code and use library code instead
* Refactor intervals by granularity using the FluentIterable, add sanity
checks
* Change !hasNext() to inputIntervals().isEmpty()
* Remove redundant lambda
* Use materialized intervals here since this is outside the overlord (for
performance)
* Name refactor to reflect the fact that bucket intervals are sorted.
* Style fixes
* Removed redundant method and have condensedIntervalIterator throw IAE
when element is null for consistency with other methods in this class (as well
that null interval when condensing does not make sense)
* Remove forbidden api
* Move helper class inside common base class to reduce public space
pollution
---
.../apache/druid/java/util/common/JodaUtils.java | 151 ++++++++++--
.../common/granularity/IntervalsByGranularity.java | 100 ++++++++
.../apache/druid/common/utils/JodaUtilsTest.java | 199 ++++++++++++++-
.../util/common/IntervalsByGranularityTest.java | 266 +++++++++++++++++++++
.../indexer/DetermineHashedPartitionsJob.java | 26 +-
.../druid/indexer/DeterminePartitionsJob.java | 6 +-
.../HadoopDruidDetermineConfigurationJob.java | 3 +-
.../druid/indexer/HadoopDruidIndexerConfig.java | 30 ++-
.../druid/indexer/HadoopDruidIndexerMapper.java | 2 +-
.../HadoopDruidDetermineConfigurationJobTest.java | 5 +-
.../druid/indexer/HadoopIngestionSpecTest.java | 2 +-
.../druid/indexer/IndexGeneratorJobTest.java | 2 +-
.../common/task/AbstractBatchIndexTask.java | 70 +++---
.../indexing/common/task/HadoopIndexTask.java | 21 +-
.../druid/indexing/common/task/IndexTask.java | 18 +-
.../parallel/ParallelIndexSupervisorTask.java | 50 ++--
.../parallel/PartialDimensionDistributionTask.java | 5 +-
.../parallel/PartialHashSegmentGenerateTask.java | 3 +-
.../task/batch/parallel/SinglePhaseSubTask.java | 16 +-
.../granularity/ArbitraryGranularitySpec.java | 70 ++----
.../indexing/granularity/BaseGranularitySpec.java | 140 +++++++++++
.../indexing/granularity/GranularitySpec.java | 19 +-
.../granularity/UniformGranularitySpec.java | 81 ++-----
.../granularity/ArbitraryGranularityTest.java | 48 +++-
.../granularity/UniformGranularityTest.java | 77 ++++--
25 files changed, 1107 insertions(+), 303 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
b/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
index a225693..96c49d9 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
@@ -19,16 +19,23 @@
package org.apache.druid.java.util.common;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
import java.util.SortedSet;
import java.util.TreeSet;
/**
+ *
*/
public class JodaUtils
{
@@ -36,10 +43,17 @@ public class JodaUtils
public static final long MAX_INSTANT = Long.MAX_VALUE / 2;
public static final long MIN_INSTANT = Long.MIN_VALUE / 2;
- public static ArrayList<Interval> condenseIntervals(Iterable<Interval>
intervals)
+ /**
+ * This method will not materialize the input intervals if they represent
+ * a SortedSet (i.e. implement that interface). If not, the method internally
+ * creates a sorted set and populates it with them thus materializing the
+ * intervals in the input.
+ *
+ * @param intervals The Iterable object containing the intervals to condense
+ * @return The condensed intervals
+ */
+ public static List<Interval> condenseIntervals(Iterable<Interval> intervals)
{
- ArrayList<Interval> retVal = new ArrayList<>();
-
final SortedSet<Interval> sortedIntervals;
if (intervals instanceof SortedSet) {
@@ -50,35 +64,122 @@ public class JodaUtils
sortedIntervals.add(interval);
}
}
+ return
ImmutableList.copyOf(condensedIntervalsIterator(sortedIntervals.iterator()));
+ }
- if (sortedIntervals.isEmpty()) {
- return new ArrayList<>();
+ /**
+ * This method does not materialize the intervals represented by the
+ * sortedIntervals iterator. However, caller needs to insure that
sortedIntervals
+ * is already sorted in ascending order (use the
Comparators.intervalsByStartThenEnd()).
+ * It avoids materialization by incrementally condensing the intervals by
+ * starting from the first and looking for "adjacent" intervals. This is
+ * possible since intervals in the Iterator are in ascending order (as
+ * guaranteed by the caller).
+ * <p>
+ * *
+ *
+ * @param sortedIntervals The iterator object containing the intervals to
condense
+ * @return An iterator for the condensed intervals. By construction the
condensed intervals are sorted
+ * in ascending order and contain no repeated elements. The iterator can
contain nulls,
+ * they will be skipped if it does.
+ * @throws IAE if an element is null or if sortedIntervals is not sorted in
ascending order
+ */
+ public static Iterator<Interval>
condensedIntervalsIterator(Iterator<Interval> sortedIntervals)
+ {
+
+ if (sortedIntervals == null || !sortedIntervals.hasNext()) {
+ return Collections.emptyIterator();
}
- Iterator<Interval> intervalsIter = sortedIntervals.iterator();
- Interval currInterval = intervalsIter.next();
- while (intervalsIter.hasNext()) {
- Interval next = intervalsIter.next();
-
- if (currInterval.abuts(next)) {
- currInterval = new Interval(currInterval.getStart(), next.getEnd());
- } else if (currInterval.overlaps(next)) {
- DateTime nextEnd = next.getEnd();
- DateTime currEnd = currInterval.getEnd();
- currInterval = new Interval(
- currInterval.getStart(),
- nextEnd.isAfter(currEnd) ? nextEnd : currEnd
- );
- } else {
- retVal.add(currInterval);
- currInterval = next;
+ final PeekingIterator<Interval> peekingIterator =
Iterators.peekingIterator(sortedIntervals);
+ return new Iterator<Interval>()
+ {
+ private Interval previous;
+
+ @Override
+ public boolean hasNext()
+ {
+ return peekingIterator.hasNext();
}
- }
- retVal.add(currInterval);
+ @Override
+ public Interval next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ Interval currInterval = peekingIterator.next();
+ if (currInterval == null) {
+ throw new IAE("Element of intervals is null");
+ }
+
+ // check sorted ascending:
+ verifyAscendingSortOrder(previous, currInterval);
+
+ previous = currInterval;
+
+ while (hasNext()) {
+ Interval next = peekingIterator.peek();
+ if (next == null) {
+ throw new IAE("Element of intervals is null");
+ }
+
+ if (currInterval.abuts(next)) {
+ currInterval = new Interval(currInterval.getStart(),
next.getEnd());
+ peekingIterator.next();
+ } else if (currInterval.overlaps(next)) {
+ DateTime nextEnd = next.getEnd();
+ DateTime currEnd = currInterval.getEnd();
+ currInterval = new Interval(
+ currInterval.getStart(),
+ nextEnd.isAfter(currEnd) ? nextEnd : currEnd
+ );
+ peekingIterator.next();
+ } else {
+ break;
+ }
+ }
+ return currInterval;
+ }
+ };
+ }
+
+ /**
+ * Verify whether an iterable of intervals contains overlapping intervals
+ *
+ * @param intervals An interval iterable sorted using
Comparators.intervalsByStartThenEnd()
+ * @return true if the iterable contains at least two overlapping intervals,
false otherwise.
+ * @throws IAE when at least an element is null or when the iterable is not
sorted
+ */
+ public static boolean containOverlappingIntervals(Iterable<Interval>
intervals)
+ {
+ if (intervals == null) {
+ return false;
+ }
+ boolean retVal = false;
+ Interval previous = null;
+ for (Interval current : intervals) {
+ if (current == null) {
+ throw new IAE("Intervals should not contain nulls");
+ }
+ verifyAscendingSortOrder(previous, current);
+ if (previous != null && previous.overlaps(current)) {
+ retVal = true;
+ break;
+ }
+ previous = current;
+ }
return retVal;
}
+ private static void verifyAscendingSortOrder(Interval previous, Interval
current)
+ {
+ if (previous != null && previous.isAfter(current)) {
+ throw new IAE("Adjacent intervals are not sorted [%s,%s]", previous,
current);
+ }
+ }
+
public static Interval umbrellaInterval(Iterable<Interval> intervals)
{
ArrayList<DateTime> startDates = new ArrayList<>();
@@ -137,4 +238,6 @@ public class JodaUtils
return max;
}
}
+
+
}
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
new file mode 100644
index 0000000..7065535
--- /dev/null
+++
b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.FluentIterable;
+import org.apache.druid.common.guava.SettableSupplier;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as
input and a given
+ * granularity. This class avoids materializing the granularity intervals
whenever possible.
+ */
+public class IntervalsByGranularity
+{
+ private final List<Interval> sortedNonOverlappingIntervals;
+ private final Granularity granularity;
+
+ /**
+ * @param intervals Intervals for which to apply the given granularity.
They should
+ * not contain overlapped intervals.
+ * @param granularity The granularity to apply
+ * @throws IAE if intervals contains at least an overlapping pair
+ */
+ public IntervalsByGranularity(Collection<Interval> intervals, Granularity
granularity)
+ {
+ // eliminate dups, sort intervals:
+ Set<Interval> intervalSet = new HashSet<>(intervals);
+ List<Interval> inputIntervals = new ArrayList<>(intervals.size());
+ inputIntervals.addAll(intervalSet);
+ inputIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+ // sanity check
+ if (JodaUtils.containOverlappingIntervals(inputIntervals)) {
+ throw new IAE("Intervals contain overlapping intervals [%s]", intervals);
+ }
+
+ // all good:
+ sortedNonOverlappingIntervals = inputIntervals;
+ this.granularity = granularity;
+ }
+
+ /**
+ * @return The intervals according the granularity. The intervals are
provided in
+ * order according to Comparators.intervalsByStartThenEnd()
+ */
+ public Iterator<Interval> granularityIntervalsIterator()
+ {
+ Iterator<Interval> ite;
+ if (sortedNonOverlappingIntervals.isEmpty()) {
+ ite = Collections.emptyIterator();
+ } else {
+ // The filter after transform & concat is to remove duplicats.
+ // This can happen when condense left intervals that did not overlap but
+ // when a larger granularity is applied then they become equal
+ // imagine input are 2013-01-01T00Z/2013-01-10T00Z,
2013-01-15T00Z/2013-01-20T00Z.
+ // the iterator for the two intervals is called, say with MONTH
granularity, two
+ // intervals will be returned, both with the same value
2013-01-01T00:00:00.000Z/2013-02-01T00:00:00.000Z.
+ // Thus dups can be created given the right conditions....
+ final SettableSupplier<Interval> previous = new SettableSupplier<>();
+ ite =
FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable)
+ .filter(interval -> {
+ if (previous.get() != null &&
previous.get().equals(interval)) {
+ return false;
+ }
+ previous.set(interval);
+ return true;
+ }).iterator();
+ }
+ return ite;
+ }
+
+}
diff --git
a/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java
b/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java
index c5610f9..d7bfb4a 100644
--- a/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java
+++ b/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java
@@ -19,8 +19,11 @@
package org.apache.druid.common.utils;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
@@ -32,6 +35,7 @@ import java.util.Collections;
import java.util.List;
/**
+ *
*/
public class JodaUtilsTest
{
@@ -75,6 +79,40 @@ public class JodaUtilsTest
Intervals.of("2011-03-05/2011-03-06")
);
+ List<Interval> expected = Arrays.asList(
+ Intervals.of("2011-01-01/2011-01-03"),
+ Intervals.of("2011-02-01/2011-02-08"),
+ Intervals.of("2011-03-01/2011-03-02"),
+ Intervals.of("2011-03-03/2011-03-04"),
+ Intervals.of("2011-03-05/2011-03-06")
+ );
+
+ List<Interval> actual = JodaUtils.condenseIntervals(intervals);
+
+ Assert.assertEquals(
+ expected,
+ actual
+ );
+
+ }
+
+
+ @Test
+ public void testCondenseIntervalsSimpleSortedIterator()
+ {
+ List<Interval> intervals = Arrays.asList(
+ Intervals.of("2011-01-01/2011-01-02"),
+ Intervals.of("2011-01-02/2011-01-03"),
+ Intervals.of("2011-02-03/2011-02-08"),
+ Intervals.of("2011-02-01/2011-02-02"),
+ Intervals.of("2011-02-01/2011-02-05"),
+ Intervals.of("2011-03-01/2011-03-02"),
+ Intervals.of("2011-03-03/2011-03-04"),
+ Intervals.of("2011-03-05/2011-03-06")
+ );
+ intervals.sort(Comparators.intervalsByStartThenEnd());
+
+ List<Interval> actual =
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
Assert.assertEquals(
Arrays.asList(
Intervals.of("2011-01-01/2011-01-03"),
@@ -83,7 +121,116 @@ public class JodaUtilsTest
Intervals.of("2011-03-03/2011-03-04"),
Intervals.of("2011-03-05/2011-03-06")
),
- JodaUtils.condenseIntervals(intervals)
+ actual
+ );
+
+ }
+
+ @Test
+ public void testCondenseIntervalsSimpleSortedIteratorOverlapping()
+ {
+ List<Interval> intervals = Arrays.asList(
+ Intervals.of("2011-02-01/2011-03-10"),
+ Intervals.of("2011-01-02/2011-02-03"),
+ Intervals.of("2011-01-07/2015-01-19"),
+ Intervals.of("2011-01-15/2011-01-19"),
+ Intervals.of("2011-01-01/2011-01-02"),
+ Intervals.of("2011-02-01/2011-03-10")
+ );
+
+ intervals.sort(Comparators.intervalsByStartThenEnd());
+
+ Assert.assertEquals(
+ Collections.singletonList(
+ Intervals.of("2011-01-01/2015-01-19")
+ ),
+
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()))
+ );
+ }
+
+ @Test(expected = IAE.class)
+ public void
testCondenseIntervalsSimplSortedIteratorOverlappingWithNullsShouldThrow()
+ {
+ List<Interval> intervals = Arrays.asList(
+ Intervals.of("2011-01-02/2011-02-03"),
+ Intervals.of("2011-02-01/2011-03-10"),
+ null,
+ Intervals.of("2011-03-07/2011-04-19"),
+ Intervals.of("2011-04-01/2015-01-19"),
+ null
+ );
+
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
+ }
+
+ @Test(expected = IAE.class)
+ public void
testCondenseIntervalsSimplSortedIteratorOverlappingWithNullFirstAndLastshouldThrow()
+ {
+ List<Interval> intervals = Arrays.asList(
+ null,
+ Intervals.of("2011-01-02/2011-02-03"),
+ Intervals.of("2011-02-01/2011-03-10"),
+ Intervals.of("2011-03-07/2011-04-19"),
+ Intervals.of("2011-04-01/2015-01-19"),
+ null
+ );
+
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCondenseIntervalsSimpleUnsortedIterator()
+ {
+ List<Interval> intervals = Arrays.asList(
+ Intervals.of("2011-01-01/2011-01-02"),
+ Intervals.of("2011-01-02/2011-01-03"),
+ Intervals.of("2011-02-03/2011-02-08"),
+ Intervals.of("2011-02-01/2011-02-02"),
+ Intervals.of("2011-02-01/2011-02-05"),
+ Intervals.of("2011-03-01/2011-03-02"),
+ Intervals.of("2011-03-03/2011-03-04"),
+ Intervals.of("2011-03-05/2011-03-06")
+ );
+
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCondenseIntervalsSimpleUnsortedIteratorSmallestAtEnd()
+ {
+ List<Interval> intervals = Arrays.asList(
+ Intervals.of("2011-01-01/2011-01-02"),
+ Intervals.of("2011-02-01/2011-02-04"),
+ Intervals.of("2011-03-01/2011-03-04"),
+ Intervals.of("2010-01-01/2010-03-04")
+ );
+
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
+ }
+
+ @Test
+ public void testCondenseIntervalsIteratorWithDups()
+ {
+ List<Interval> intervals = Arrays.asList(
+ Intervals.of("2011-01-01/2011-01-02"),
+ Intervals.of("2011-02-04/2011-02-05"),
+ Intervals.of("2011-01-01/2011-01-02"),
+ Intervals.of("2011-01-02/2011-01-03"),
+ Intervals.of("2011-02-03/2011-02-08"),
+ Intervals.of("2011-02-03/2011-02-08"),
+ Intervals.of("2011-02-01/2011-02-02"),
+ Intervals.of("2011-02-01/2011-02-05"),
+ Intervals.of("2011-03-01/2011-03-02"),
+ Intervals.of("2011-03-03/2011-03-04"),
+ Intervals.of("2011-03-05/2011-03-06")
+ );
+ intervals.sort(Comparators.intervalsByStartThenEnd());
+
+ Assert.assertEquals(
+ Arrays.asList(
+ Intervals.of("2011-01-01/2011-01-03"),
+ Intervals.of("2011-02-01/2011-02-08"),
+ Intervals.of("2011-03-01/2011-03-02"),
+ Intervals.of("2011-03-03/2011-03-04"),
+ Intervals.of("2011-03-05/2011-03-06")
+ ),
+
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()))
);
}
@@ -144,4 +291,54 @@ public class JodaUtilsTest
Assert.assertEquals(Long.MAX_VALUE, period.getMinutes());
}
+ @Test
+ public void testShouldContainOverlappingIntervals()
+ {
+ List<Interval> intervals = Arrays.asList(
+ Intervals.of("2011-02-01/2011-03-10"),
+ Intervals.of("2011-03-25/2011-04-03"),
+ Intervals.of("2011-04-01/2015-01-19"),
+ Intervals.of("2016-01-15/2016-01-19")
+ );
+ Assert.assertTrue(JodaUtils.containOverlappingIntervals(intervals));
+ }
+
+
+ @Test
+ public void testShouldNotContainOverlappingIntervals()
+ {
+ List<Interval> intervals = Arrays.asList(
+ Intervals.of("2011-02-01/2011-03-10"),
+ Intervals.of("2011-03-10/2011-04-03"),
+ Intervals.of("2011-04-04/2015-01-14"),
+ Intervals.of("2016-01-15/2016-01-19")
+ );
+ Assert.assertFalse(JodaUtils.containOverlappingIntervals(intervals));
+ }
+
+ @Test(expected = IAE.class)
+ public void testOverlappingIntervalsContainsNull()
+ {
+ List<Interval> intervals = Arrays.asList(
+ Intervals.of("2011-02-01/2011-03-10"),
+ null,
+ Intervals.of("2011-04-04/2015-01-14"),
+ Intervals.of("2016-01-15/2016-01-19")
+ );
+ JodaUtils.containOverlappingIntervals(intervals);
+ }
+
+ @Test(expected = IAE.class)
+ public void testOverlappingIntervalsContainsUnsorted()
+ {
+ List<Interval> intervals = Arrays.asList(
+ Intervals.of("2011-02-01/2011-03-10"),
+ Intervals.of("2011-03-10/2011-04-03"),
+ Intervals.of("2016-01-15/2016-01-19"),
+ Intervals.of("2011-04-04/2015-01-14")
+ );
+ JodaUtils.containOverlappingIntervals(intervals);
+ }
+
+
}
diff --git
a/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
new file mode 100644
index 0000000..a38e6d5
--- /dev/null
+++
b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class IntervalsByGranularityTest
+{
+ private static final long SECONDS_IN_YEAR = 31536000;
+
+
+ @Test
+ public void testTrivialIntervalExplosion()
+ {
+ Interval first = Intervals.of("2013-01-01T00Z/2013-02-01T00Z");
+ Interval second = Intervals.of("2012-01-01T00Z/2012-02-01T00Z");
+ Interval third = Intervals.of("2002-01-01T00Z/2003-01-01T00Z");
+
+ IntervalsByGranularity intervals = new IntervalsByGranularity(
+ ImmutableList.of(first, second, third),
+ Granularity.fromString("DAY")
+ );
+
+ // get count:
+ Iterator<Interval> granularityIntervals =
intervals.granularityIntervalsIterator();
+ long count = getCount(granularityIntervals);
+ Assert.assertTrue(count == 62 + 365);
+
+ granularityIntervals = intervals.granularityIntervalsIterator();
+ count = getCountWithNoHasNext(granularityIntervals);
+ Assert.assertTrue(count == 62 + 365);
+ }
+
+
+ @Test
+ public void testDups()
+ {
+ Interval first = Intervals.of("2013-01-01T00Z/2013-02-01T00Z");
+ Interval second = Intervals.of("2012-04-01T00Z/2012-05-01T00Z");
+ Interval third = Intervals.of("2013-01-01T00Z/2013-02-01T00Z"); // dup
+
+ IntervalsByGranularity intervals = new IntervalsByGranularity(
+ ImmutableList.of(first, second, third),
+ Granularity.fromString("DAY")
+ );
+
+ // get count:
+ Iterator<Interval> granularityIntervals =
intervals.granularityIntervalsIterator();
+ long count = getCount(granularityIntervals);
+ Assert.assertTrue(count == 61);
+ }
+
+
+ @Test
+ public void testCondenseForManyIntervals()
+ {
+ // This method attempts to test that there are no issues when condensed is
called
+ // with an iterator pointing to millions of intervals (since the version
of condensed
+ // used here takes an interval iterator and does not materialize intervals)
+ Interval first = Intervals.of("2012-01-01T00Z/P1Y");
+ IntervalsByGranularity intervals = new IntervalsByGranularity(
+ ImmutableList.of(first),
+ Granularity.fromString("SECOND")
+ );
+ Assert.assertEquals(
+ ImmutableList.of(Intervals.of("2012-01-01T00Z/2013-01-01T00Z")),
+
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.granularityIntervalsIterator()))
+ );
+ }
+
+ @Test
+ public void testIntervalExplosion()
+ {
+ Interval first = Intervals.of("2012-01-01T00Z/2012-12-31T00Z");
+ Interval second = Intervals.of("2002-01-01T00Z/2002-12-31T00Z");
+ Interval third = Intervals.of("2021-01-01T00Z/2021-06-30T00Z");
+ IntervalsByGranularity intervals = new IntervalsByGranularity(
+ ImmutableList.of(first, second, third),
+ Granularity.fromString("SECOND")
+ );
+
+ // get count:
+ Iterator<Interval> granularityIntervals =
intervals.granularityIntervalsIterator();
+ long count = getCount(granularityIntervals);
+ Assert.assertTrue(count == 78537600);
+
+ }
+
+ @Test
+ public void testSimpleEliminateRepeated()
+ {
+ final List<Interval> inputIntervals = ImmutableList.of(
+ Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
+ Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
+ Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
+ Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
+ );
+ IntervalsByGranularity intervals = new IntervalsByGranularity(
+ inputIntervals,
+ Granularities.MONTH
+ );
+
+ Assert.assertEquals(
+ ImmutableList.of(Intervals.of("2012-01-01T00Z/2012-02-01T00Z")),
+ ImmutableList.copyOf(intervals.granularityIntervalsIterator())
+ );
+
+ }
+
+ @Test
+ public void testALittleMoreComplexEliminateRepeated()
+ {
+ final List<Interval> inputIntervals = ImmutableList.of(
+ Intervals.of("2015-01-08T00Z/2015-01-11T00Z"),
+ Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
+ Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
+ Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
+ Intervals.of("2012-01-01T00Z/2012-01-03T00Z"),
+ Intervals.of("2007-03-08T00Z/2007-04-11T00Z")
+ );
+ IntervalsByGranularity intervals = new IntervalsByGranularity(
+ inputIntervals,
+ Granularities.MONTH
+ );
+
+ Assert.assertEquals(
+ ImmutableList.of(
+ Intervals.of("2007-03-01T00Z/2007-04-01T00Z"),
+ Intervals.of("2007-04-01T00Z/2007-05-01T00Z"),
+ Intervals.of("2012-01-01T00Z/2012-02-01T00Z"),
+ Intervals.of("2015-01-01T00Z/2015-02-01T00Z")
+ ),
+ ImmutableList.copyOf(intervals.granularityIntervalsIterator())
+ );
+
+ }
+
+ @Test(expected = IAE.class)
+ public void testOverlappingShouldThrow()
+ {
+ List<Interval> inputIntervals = ImmutableList.of(
+ Intervals.of("2013-01-01T00Z/2013-01-11T00Z"),
+ Intervals.of("2013-01-05T00Z/2013-01-08T00Z"),
+ Intervals.of("2013-01-07T00Z/2013-01-15T00Z")
+ );
+
+ IntervalsByGranularity intervals = new IntervalsByGranularity(
+ inputIntervals,
+ Granularity.fromString("DAY")
+ );
+ }
+
+
+ @Test
+ public void testWithGranularity()
+ {
+ List<Interval> inputIntervals = ImmutableList.of(
+ Intervals.of("2013-01-01T00Z/2013-01-10T00Z"),
+ Intervals.of("2013-01-15T00Z/2013-01-20T00Z"),
+ Intervals.of("2013-02-07T00Z/2013-02-15T00Z")
+ );
+
+ IntervalsByGranularity intervals = new IntervalsByGranularity(
+ inputIntervals,
+ Granularity.fromString("MONTH")
+ );
+
+ // get count:
+ Iterator<Interval> granularityIntervals =
intervals.granularityIntervalsIterator();
+ long count = getCount(granularityIntervals);
+ Assert.assertTrue(count == 2);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testRemoveThrowsException()
+ {
+ final List<Interval> inputIntervals = ImmutableList.of(
+ Intervals.of("2015-01-08T00Z/2015-01-11T00Z")
+ );
+ IntervalsByGranularity intervals = new IntervalsByGranularity(
+ inputIntervals,
+ Granularities.MONTH
+ );
+ intervals.granularityIntervalsIterator().remove();
+ }
+
+ @Test
+ public void testEmptyInput()
+ {
+ final List<Interval> inputIntervals = Collections.emptyList();
+ IntervalsByGranularity intervals = new IntervalsByGranularity(
+ inputIntervals,
+ Granularities.MONTH
+ );
+ Assert.assertFalse(intervals.granularityIntervalsIterator().hasNext());
+ }
+
+ private long getCount(Iterator<Interval> granularityIntervalIterator)
+ {
+ long count = 0;
+ Interval previous = null;
+ Interval current;
+ while (granularityIntervalIterator.hasNext()) {
+ current = granularityIntervalIterator.next();
+ if (previous != null) {
+ Assert.assertTrue(previous + "," + current, previous.getEndMillis() <=
current.getStartMillis());
+ }
+ previous = current;
+ count++;
+ }
+ return count;
+ }
+
+ private long getCountWithNoHasNext(Iterator<Interval>
granularityIntervalIterator)
+ {
+ long count = 0;
+ Interval previous = null;
+ Interval current;
+
+ while (true) {
+ try {
+ current = granularityIntervalIterator.next();
+ }
+ catch (NoSuchElementException e) {
+ // done
+ break;
+ }
+ if (previous != null) {
+ Assert.assertTrue(previous.getEndMillis() <= current.getStartMillis());
+ }
+ previous = current;
+ count++;
+ }
+
+ return count;
+ }
+
+}
diff --git
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
index b1ec546..1d0c3c4 100644
---
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
+++
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexer;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
@@ -64,7 +65,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
/**
@@ -109,10 +109,10 @@ public class DetermineHashedPartitionsJob implements Jobby
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class);
- if (!config.getSegmentGranularIntervals().isPresent()) {
+ if (config.getInputIntervals().isEmpty()) {
groupByJob.setNumReduceTasks(1);
} else {
-
groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size());
+
groupByJob.setNumReduceTasks(Iterators.size(config.getSegmentGranularIntervals().iterator()));
}
JobHelper.setupClasspath(
JobHelper.distributedClassPath(config.getWorkingPath()),
@@ -151,7 +151,7 @@ public class DetermineHashedPartitionsJob implements Jobby
log.info("Job completed, loading up partitions for intervals[%s].",
config.getSegmentGranularIntervals());
FileSystem fileSystem = null;
- if (!config.getSegmentGranularIntervals().isPresent()) {
+ if (config.getInputIntervals().isEmpty()) {
final Path intervalInfoPath = config.makeIntervalInfoPath();
fileSystem =
intervalInfoPath.getFileSystem(groupByJob.getConfiguration());
if (!Utils.exists(groupByJob, fileSystem, intervalInfoPath)) {
@@ -159,7 +159,9 @@ public class DetermineHashedPartitionsJob implements Jobby
}
List<Interval> intervals =
HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
Utils.openInputStream(groupByJob, intervalInfoPath),
- new TypeReference<List<Interval>>() {}
+ new TypeReference<List<Interval>>()
+ {
+ }
);
config.setGranularitySpec(
new UniformGranularitySpec(
@@ -182,7 +184,7 @@ public class DetermineHashedPartitionsJob implements Jobby
}
HashPartitionFunction partitionFunction = ((HashedPartitionsSpec)
partitionsSpec).getPartitionFunction();
int shardCount = 0;
- for (Interval segmentGranularity :
config.getSegmentGranularIntervals().get()) {
+ for (Interval segmentGranularity : config.getSegmentGranularIntervals())
{
DateTime bucket = segmentGranularity.getStart();
final Path partitionInfoPath =
config.makeSegmentPartitionInfoPath(segmentGranularity);
@@ -295,11 +297,11 @@ public class DetermineHashedPartitionsJob implements Jobby
super.setup(context);
rollupGranularity =
getConfig().getGranularitySpec().getQueryGranularity();
config =
HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
- Optional<Set<Interval>> intervals = config.getSegmentGranularIntervals();
- if (intervals.isPresent()) {
+ Iterable<Interval> intervals = config.getSegmentGranularIntervals();
+ if (intervals.iterator().hasNext()) {
determineIntervals = false;
final ImmutableMap.Builder<Interval, HyperLogLogCollector> builder =
ImmutableMap.builder();
- for (final Interval bucketInterval : intervals.get()) {
+ for (final Interval bucketInterval : intervals) {
builder.put(bucketInterval,
HyperLogLogCollector.makeLatestCollector());
}
hyperLogLogs = builder.build();
@@ -376,7 +378,7 @@ public class DetermineHashedPartitionsJob implements Jobby
protected void setup(Context context)
{
config =
HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
- determineIntervals = !config.getSegmentGranularIntervals().isPresent();
+ determineIntervals = config.getInputIntervals().isEmpty();
}
@Override
@@ -477,11 +479,11 @@ public class DetermineHashedPartitionsJob implements Jobby
{
this.config = config;
HadoopDruidIndexerConfig hadoopConfig =
HadoopDruidIndexerConfig.fromConfiguration(config);
- if (hadoopConfig.getSegmentGranularIntervals().isPresent()) {
+ if (!hadoopConfig.getInputIntervals().isEmpty()) {
determineIntervals = false;
int reducerNumber = 0;
ImmutableMap.Builder<LongWritable, Integer> builder =
ImmutableMap.builder();
- for (Interval interval :
hadoopConfig.getSegmentGranularIntervals().get()) {
+ for (Interval interval : hadoopConfig.getSegmentGranularIntervals()) {
builder.put(new LongWritable(interval.getStartMillis()),
reducerNumber++);
}
reducerLookup = builder.build();
diff --git
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
index 1e810c6..ebe69cd 100644
---
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
+++
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
@@ -212,7 +212,7 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.setOutputKeyClass(BytesWritable.class);
dimSelectionJob.setOutputValueClass(Text.class);
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
-
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size());
+
dimSelectionJob.setNumReduceTasks(Iterators.size(config.getGranularitySpec().sortedBucketIntervals().iterator()));
JobHelper.setupClasspath(
JobHelper.distributedClassPath(config.getWorkingPath()),
JobHelper.distributedClassPath(config.makeIntermediatePath()),
@@ -256,7 +256,7 @@ public class DeterminePartitionsJob implements Jobby
FileSystem fileSystem = null;
Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>();
int shardCount = 0;
- for (Interval segmentGranularity :
config.getSegmentGranularIntervals().get()) {
+ for (Interval segmentGranularity : config.getSegmentGranularIntervals())
{
final Path partitionInfoPath =
config.makeSegmentPartitionInfoPath(segmentGranularity);
if (fileSystem == null) {
fileSystem =
partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
@@ -447,7 +447,7 @@ public class DeterminePartitionsJob implements Jobby
final ImmutableMap.Builder<Long, Integer> timeIndexBuilder =
ImmutableMap.builder();
int idx = 0;
- for (final Interval bucketInterval :
config.getGranularitySpec().bucketIntervals().get()) {
+ for (final Interval bucketInterval :
config.getGranularitySpec().sortedBucketIntervals()) {
timeIndexBuilder.put(bucketInterval.getStartMillis(), idx);
idx++;
}
diff --git
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
index a4f719a..8b5b4b6 100644
---
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
+++
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.TreeMap;
/**
+ *
*/
public class HadoopDruidDetermineConfigurationJob implements Jobby
{
@@ -75,7 +76,7 @@ public class HadoopDruidDetermineConfigurationJob implements
Jobby
}
Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>();
int shardCount = 0;
- for (Interval segmentGranularity :
config.getSegmentGranularIntervals().get()) {
+ for (Interval segmentGranularity : config.getSegmentGranularIntervals())
{
DateTime bucket = segmentGranularity.getStart();
// negative shardsPerInterval means a single shard
List<HadoopyShardSpec> specs =
Lists.newArrayListWithCapacity(shardsPerInterval);
diff --git
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
index 0b69b33..54c8f07 100644
---
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
+++
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
@@ -76,10 +76,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
-import java.util.SortedSet;
/**
+ *
*/
public class HadoopDruidIndexerConfig
{
@@ -99,7 +98,7 @@ public class HadoopDruidIndexerConfig
/**
* Hadoop tasks running in an Indexer process need a reference to the
Properties instance created
* in PropertiesModule so that the task sees properties that were specified
in Druid's config files.
- *
+ * <p>
* This is not strictly necessary for Peon-based tasks which have all
properties, including config file properties,
* specified on their command line by ForkingTaskRunner (so they could use
System.getProperties() only),
* but we always use the injected Properties for consistency.
@@ -314,9 +313,9 @@ public class HadoopDruidIndexerConfig
public Optional<List<Interval>> getIntervals()
{
- Optional<SortedSet<Interval>> setOptional =
schema.getDataSchema().getGranularitySpec().bucketIntervals();
- if (setOptional.isPresent()) {
- return Optional.of(JodaUtils.condenseIntervals(setOptional.get()));
+ Iterable<Interval> bucketIntervals =
schema.getDataSchema().getGranularitySpec().sortedBucketIntervals();
+ if (bucketIntervals.iterator().hasNext()) {
+ return Optional.of(JodaUtils.condenseIntervals(bucketIntervals));
} else {
return Optional.absent();
}
@@ -426,7 +425,6 @@ public class HadoopDruidIndexerConfig
* Get the proper bucket for some input row.
*
* @param inputRow an InputRow
- *
* @return the Bucket that this row belongs to
*/
Optional<Bucket> getBucket(InputRow inputRow)
@@ -455,14 +453,12 @@ public class HadoopDruidIndexerConfig
}
- Optional<Set<Interval>> getSegmentGranularIntervals()
+ Iterable<Interval> getSegmentGranularIntervals()
{
- return Optional.fromNullable(
+ return
schema.getDataSchema()
.getGranularitySpec()
- .bucketIntervals()
- .orNull()
- );
+ .sortedBucketIntervals();
}
public List<Interval> getInputIntervals()
@@ -474,15 +470,17 @@ public class HadoopDruidIndexerConfig
Optional<Iterable<Bucket>> getAllBuckets()
{
- Optional<Set<Interval>> intervals = getSegmentGranularIntervals();
- if (intervals.isPresent()) {
+ Iterable<Interval> intervals = getSegmentGranularIntervals();
+ if (intervals.iterator().hasNext()) {
return Optional.of(
FunctionalIterable
- .create(intervals.get())
+ .create(intervals)
.transformCat(
input -> {
final DateTime bucketTime = input.getStart();
- final List<HadoopyShardSpec> specs =
schema.getTuningConfig().getShardSpecs().get(bucketTime.getMillis());
+ final List<HadoopyShardSpec> specs =
schema.getTuningConfig()
+ .getShardSpecs()
+
.get(bucketTime.getMillis());
if (specs == null) {
return ImmutableList.of();
}
diff --git
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java
index 02ced6c..5190280 100644
---
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java
+++
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java
@@ -83,7 +83,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT,
VALUEOUT> extends Mapper<
throw new ParseException(errorMsg);
}
- if (!granularitySpec.bucketIntervals().isPresent()
+ if (granularitySpec.inputIntervals().isEmpty()
||
granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()))
.isPresent()) {
innerMap(inputRow, context);
diff --git
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java
index 3c809d2..f16ac13 100644
---
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java
+++
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java
@@ -20,7 +20,6 @@
package org.apache.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@@ -59,7 +58,7 @@ public class HadoopDruidDetermineConfigurationJobTest
final HadoopDruidIndexerConfig config =
Mockito.mock(HadoopDruidIndexerConfig.class);
Mockito.when(config.isDeterminingPartitions()).thenReturn(false);
Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec);
-
Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals));
+ Mockito.when(config.getSegmentGranularIntervals()).thenReturn(intervals);
final ArgumentCaptor<Map<Long, List<HadoopyShardSpec>>> resultCaptor =
ArgumentCaptor.forClass(Map.class);
Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture());
@@ -99,7 +98,7 @@ public class HadoopDruidDetermineConfigurationJobTest
final HadoopDruidIndexerConfig config =
Mockito.mock(HadoopDruidIndexerConfig.class);
Mockito.when(config.isDeterminingPartitions()).thenReturn(false);
Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec);
-
Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals));
+ Mockito.when(config.getSegmentGranularIntervals()).thenReturn(intervals);
final ArgumentCaptor<Map<Long, List<HadoopyShardSpec>>> resultCaptor =
ArgumentCaptor.forClass(Map.class);
Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture());
diff --git
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java
index aed3381..823cdd2 100644
---
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java
+++
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java
@@ -78,7 +78,7 @@ public class HadoopIngestionSpecTest
Assert.assertEquals(
"getIntervals",
Collections.singletonList(Intervals.of("2012-01-01/P1D")),
- granularitySpec.getIntervals().get()
+ granularitySpec.inputIntervals()
);
Assert.assertEquals(
diff --git
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
index 8d59554..c0254c4 100644
---
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
+++
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
@@ -599,7 +599,7 @@ public class IndexGeneratorJobTest
Map<Long, List<HadoopyShardSpec>> shardSpecs = new
TreeMap<>(DateTimeComparator.getInstance());
int shardCount = 0;
int segmentNum = 0;
- for (Interval segmentGranularity :
config.getSegmentGranularIntervals().get()) {
+ for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
List<ShardSpec> specs = constructShardSpecFromShardInfo(partitionType,
shardInfoForEachShard[segmentNum++]);
List<HadoopyShardSpec> actualSpecs =
Lists.newArrayListWithExpectedSize(specs.size());
for (ShardSpec spec : specs) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index f9c725a..4a7fa0d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.task;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputFormat;
@@ -48,6 +47,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
+import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
@@ -69,7 +69,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -263,40 +262,17 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
}
/**
- * Attempts to acquire a lock that covers the intervals specified in a
certain granularitySpec.
- *
- * This method uses {@link GranularitySpec#bucketIntervals()} to get the
list of intervals to lock, and passes them
- * to {@link #determineLockGranularityAndTryLock(TaskActionClient, List)}.
- *
- * Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to
acquire a time chunk or segment lock.
- *
- * If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param
intervals} is nonempty, then this method
- * will initialize {@link #taskLockHelper} as a side effect.
- *
- * @return whether the lock was acquired
- */
- protected boolean determineLockGranularityAndTryLock(
- TaskActionClient client,
- GranularitySpec granularitySpec
- ) throws IOException
- {
- final List<Interval> intervals =
granularitySpec.bucketIntervals().isPresent()
- ? new
ArrayList<>(granularitySpec.bucketIntervals().get())
- : Collections.emptyList();
- return determineLockGranularityAndTryLock(client, intervals);
- }
-
- /**
* Attempts to acquire a lock that covers certain intervals.
- *
+ * <p>
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to
acquire a time chunk or segment lock.
- *
+ * <p>
* If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param
intervals} is nonempty, then this method
* will initialize {@link #taskLockHelper} as a side effect.
*
* @return whether the lock was acquired
*/
- boolean determineLockGranularityAndTryLock(TaskActionClient client,
List<Interval> intervals) throws IOException
+ public boolean determineLockGranularityAndTryLock(TaskActionClient client,
List<Interval> intervals)
+ throws IOException
{
final boolean forceTimeChunkLock = getContextValue(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
@@ -325,9 +301,9 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
/**
* Attempts to acquire a lock that covers certain segments.
- *
+ * <p>
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to
acquire a time chunk or segment lock.
- *
+ * <p>
* This method will initialize {@link #taskLockHelper} as a side effect.
*
* @return whether the lock was acquired
@@ -396,25 +372,33 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
}
}
+
protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval>
intervals) throws IOException
{
// The given intervals are first converted to align with segment
granularity. This is because,
// when an overwriting task finds a version for a given input row, it
expects the interval
// associated to each version to be equal or larger than the time bucket
where the input row falls in.
// See ParallelIndexSupervisorTask.findVersion().
- final Set<Interval> uniqueIntervals = new HashSet<>();
+ final Iterator<Interval> intervalIterator;
final Granularity segmentGranularity = getSegmentGranularity();
- for (Interval interval : intervals) {
- if (segmentGranularity == null) {
- uniqueIntervals.add(interval);
- } else {
- Iterables.addAll(uniqueIntervals,
segmentGranularity.getIterable(interval));
- }
+ if (segmentGranularity == null) {
+ intervalIterator = JodaUtils.condenseIntervals(intervals).iterator();
+ } else {
+ IntervalsByGranularity intervalsByGranularity = new
IntervalsByGranularity(intervals, segmentGranularity);
+ // the following is calling a condense that does not materialize the
intervals:
+ intervalIterator =
JodaUtils.condensedIntervalsIterator(intervalsByGranularity.granularityIntervalsIterator());
}
- // Condense intervals to avoid creating too many locks.
- for (Interval interval : JodaUtils.condenseIntervals(uniqueIntervals)) {
- final TaskLock lock = client.submit(new
TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval));
+ // Intervals are already condensed to avoid creating too many locks.
+ // Intervals are also sorted and thus it's safe to compare only the
previous interval and current one for dedup.
+ Interval prev = null;
+ while (intervalIterator.hasNext()) {
+ final Interval cur = intervalIterator.next();
+ if (prev != null && cur.equals(prev)) {
+ continue;
+ }
+ prev = cur;
+ final TaskLock lock = client.submit(new
TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur));
if (lock == null) {
return false;
}
@@ -525,11 +509,11 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
* If the given firehoseFactory is {@link IngestSegmentFirehoseFactory},
then it finds the segments to lock
* from the firehoseFactory. This is because those segments will be read by
this task no matter what segments would be
* filtered by intervalsToRead, so they need to be locked.
- *
+ * <p>
* However, firehoseFactory is not IngestSegmentFirehoseFactory, it means
this task will overwrite some segments
* with data read from some input source outside of Druid. As a result, only
the segments falling in intervalsToRead
* should be locked.
- *
+ * <p>
* The order of segments within the returned list is unspecified, but each
segment is guaranteed to appear in the list
* only once.
*/
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 1236e35..c11ce31 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -82,7 +82,6 @@ import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.SortedSet;
public class HadoopIndexTask extends HadoopTask implements ChatHandler
{
@@ -189,12 +188,10 @@ public class HadoopIndexTask extends HadoopTask
implements ChatHandler
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
- Optional<SortedSet<Interval>> intervals =
spec.getDataSchema().getGranularitySpec().bucketIntervals();
- if (intervals.isPresent()) {
+ Iterable<Interval> intervals =
spec.getDataSchema().getGranularitySpec().sortedBucketIntervals();
+ if (intervals.iterator().hasNext()) {
Interval interval = JodaUtils.umbrellaInterval(
- JodaUtils.condenseIntervals(
- intervals.get()
- )
+ JodaUtils.condenseIntervals(intervals)
);
return taskActionClient.submit(new
TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)) != null;
} else {
@@ -312,7 +309,7 @@ public class HadoopIndexTask extends HadoopTask implements
ChatHandler
registerResourceCloserOnAbnormalExit(config -> killHadoopJob());
String hadoopJobIdFile = getHadoopJobIdFileName();
final ClassLoader loader = buildClassLoader(toolbox);
- boolean determineIntervals =
!spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
+ boolean determineIntervals =
spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
spec,
@@ -377,7 +374,7 @@ public class HadoopIndexTask extends HadoopTask implements
ChatHandler
if (determineIntervals) {
Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(
-
indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get()
+
indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals()
)
);
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY,
Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
@@ -621,7 +618,9 @@ public class HadoopIndexTask extends HadoopTask implements
ChatHandler
}
- /** Called indirectly in {@link HadoopIndexTask#run(TaskToolbox)}. */
+ /**
+ * Called indirectly in {@link HadoopIndexTask#run(TaskToolbox)}.
+ */
@SuppressWarnings("unused")
public static class HadoopDetermineConfigInnerProcessingRunner
{
@@ -770,9 +769,9 @@ public class HadoopIndexTask extends HadoopTask implements
ChatHandler
jobId
});
- return new String[] {jobId, (res == 0 ? "Success" : "Fail")};
+ return new String[]{jobId, (res == 0 ? "Success" : "Fail")};
}
- return new String[] {jobId, "Fail"};
+ return new String[]{jobId, "Fail"};
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index ce9da23..88779ab 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -123,7 +123,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.SortedSet;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -231,7 +230,10 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
throw new UOE("partitionsSpec[%s] is not supported",
tuningConfig.getPartitionsSpec().getClass().getName());
}
}
- return determineLockGranularityAndTryLock(taskActionClient,
ingestionSchema.dataSchema.getGranularitySpec());
+ return determineLockGranularityAndTryLock(
+ taskActionClient,
+ ingestionSchema.dataSchema.getGranularitySpec().inputIntervals()
+ );
}
@Override
@@ -452,10 +454,10 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
);
- final boolean determineIntervals = !ingestionSchema.getDataSchema()
- .getGranularitySpec()
- .bucketIntervals()
- .isPresent();
+ final boolean determineIntervals = ingestionSchema.getDataSchema()
+ .getGranularitySpec()
+ .inputIntervals()
+ .isEmpty();
final InputSource inputSource =
ingestionSchema.getIOConfig().getNonNullInputSource(
ingestionSchema.getDataSchema().getParser()
@@ -591,7 +593,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
final GranularitySpec granularitySpec =
ingestionSchema.getDataSchema().getGranularitySpec();
// Must determine intervals if unknown, since we acquire all locks before
processing any data.
- final boolean determineIntervals =
!granularitySpec.bucketIntervals().isPresent();
+ final boolean determineIntervals =
granularitySpec.inputIntervals().isEmpty();
// Must determine partitions if rollup is guaranteed and the user didn't
provide a specific value.
final boolean determineNumPartitions =
partitionsSpec.needsDeterminePartitions(false);
@@ -630,7 +632,7 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
@Nonnull DynamicPartitionsSpec partitionsSpec
)
{
- final SortedSet<Interval> intervals =
granularitySpec.bucketIntervals().get();
+ final Iterable<Interval> intervals =
granularitySpec.sortedBucketIntervals();
final int numBucketsPerInterval = 1;
final LinearPartitionAnalysis partitionAnalysis = new
LinearPartitionAnalysis(partitionsSpec);
intervals.forEach(interval -> partitionAnalysis.updateBucket(interval,
numBucketsPerInterval));
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 038e788..bd64fbd 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -109,7 +109,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
@@ -141,7 +140,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
* the segment granularity of existing segments until the task reads all
data because we don't know what segments
* are going to be overwritten. As a result, we assume that segment
granularity is going to be changed if intervals
* are missing and force to use timeChunk lock.
- *
+ * <p>
* This variable is initialized in the constructor and used in {@link #run}
to log that timeChunk lock was enforced
* in the task logs.
*/
@@ -194,10 +193,10 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
ingestionSchema.getDataSchema().getParser()
);
this.missingIntervalsInOverwriteMode =
!ingestionSchema.getIOConfig().isAppendToExisting()
- && !ingestionSchema.getDataSchema()
-
.getGranularitySpec()
-
.bucketIntervals()
- .isPresent();
+ && ingestionSchema.getDataSchema()
+
.getGranularitySpec()
+ .inputIntervals()
+ .isEmpty();
if (missingIntervalsInOverwriteMode) {
addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
}
@@ -320,12 +319,12 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
)
{
return new PartialRangeSegmentGenerateParallelIndexTaskRunner(
- toolbox,
- getId(),
- getGroupId(),
- ingestionSchema,
- getContext(),
- intervalToPartitions
+ toolbox,
+ getId(),
+ getGroupId(),
+ ingestionSchema,
+ getContext(),
+ intervalToPartitions
);
}
@@ -350,7 +349,10 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
- return determineLockGranularityAndTryLock(taskActionClient,
ingestionSchema.getDataSchema().getGranularitySpec());
+ return determineLockGranularityAndTryLock(
+ taskActionClient,
+ ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
+ );
}
@Override
@@ -513,13 +515,13 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
/**
* Run the multi phase parallel indexing for perfect rollup. In this mode,
the parallel indexing is currently
* executed in two phases.
- *
+ * <p>
* - In the first phase, each task partitions input data and stores those
partitions in local storage.
- * - The partition is created based on the segment granularity (primary
partition key) and the partition dimension
- * values in {@link PartitionsSpec} (secondary partition key).
- * - Partitioned data is maintained by {@link IntermediaryDataManager}.
+ * - The partition is created based on the segment granularity (primary
partition key) and the partition dimension
+ * values in {@link PartitionsSpec} (secondary partition key).
+ * - Partitioned data is maintained by {@link IntermediaryDataManager}.
* - In the second phase, each task reads partitioned data from the
intermediary data server (middleManager
- * or indexer) and merges them to create the final segments.
+ * or indexer) and merges them to create the final segments.
*/
private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws
Exception
{
@@ -817,7 +819,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
}
private static <S extends PartitionStat, L extends PartitionLocation>
- Map<Pair<Interval, Integer>, List<L>>
groupPartitionLocationsPerPartition(
+ Map<Pair<Interval, Integer>, List<L>>
groupPartitionLocationsPerPartition(
Map<String, ? extends GeneratedPartitionsReport<S>> subTaskIdToReport,
BiFunction<String, S, L> createPartitionLocationFunction
)
@@ -891,7 +893,6 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
* @param index index of partition
* @param total number of items to partition
* @param splits number of desired partitions
- *
* @return partition range: [lhs, rhs)
*/
private static Pair<Integer, Integer> getPartitionBoundaries(int index, int
total, int splits)
@@ -1038,7 +1039,10 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
{
final String dataSource = getDataSource();
final GranularitySpec granularitySpec =
getIngestionSchema().getDataSchema().getGranularitySpec();
- final Optional<SortedSet<Interval>> bucketIntervals =
granularitySpec.bucketIntervals();
+ // This method is called whenever subtasks need to allocate a new segment
via the supervisor task.
+ // As a result, this code is never called in the Overlord. For now using
the materialized intervals
+ // here is ok for performance reasons
+ final Set<Interval> materializedBucketIntervals =
granularitySpec.materializedBucketIntervals();
// List locks whenever allocating a new segment because locks might be
revoked and no longer valid.
final List<TaskLock> locks = toolbox
@@ -1054,7 +1058,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
Interval interval;
String version;
- if (bucketIntervals.isPresent()) {
+ if (!materializedBucketIntervals.isEmpty()) {
// If granularity spec has explicit intervals, we just need to find the
version associated to the interval.
// This is because we should have gotten all required locks up front
when the task starts up.
final Optional<Interval> maybeInterval =
granularitySpec.bucketInterval(timestamp);
@@ -1063,7 +1067,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
}
interval = maybeInterval.get();
- if (!bucketIntervals.get().contains(interval)) {
+ if (!materializedBucketIntervals.contains(interval)) {
throw new ISE("Unspecified interval[%s] in granularitySpec[%s]",
interval, granularitySpec);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index 6bec35d..a0a6179 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -106,7 +106,7 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
);
}
- @VisibleForTesting // Only for testing
+ @VisibleForTesting
PartialDimensionDistributionTask(
@Nullable String id,
final String groupId,
@@ -334,7 +334,8 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
this(queryGranularity, BLOOM_FILTER_EXPECTED_INSERTIONS,
BLOOM_FILTER_EXPECTED_FALSE_POSITIVE_PROBABILTY);
}
- @VisibleForTesting // to allow controlling false positive rate of bloom
filter
+ @VisibleForTesting
+ // to allow controlling false positive rate of bloom filter
DedupInputRowFilter(
Granularity queryGranularity,
int bloomFilterExpectedInsertions,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index b252e5d..da22876 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -42,7 +42,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.util.SortedSet;
import java.util.stream.Collectors;
/**
@@ -194,7 +193,7 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
// We only care about the intervals in intervalToNumShardsOverride here.
intervalToNumShardsOverride.forEach(partitionAnalysis::updateBucket);
} else {
- final SortedSet<Interval> intervals =
granularitySpec.bucketIntervals().get();
+ final Iterable<Interval> intervals =
granularitySpec.sortedBucketIntervals();
final int numBucketsPerInterval = partitionsSpec.getNumShards() == null
? 1
: partitionsSpec.getNumShards();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 2cc9f72..ec019b1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -98,7 +98,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
* the segment granularity of existing segments until the task reads all
data because we don't know what segments
* are going to be overwritten. As a result, we assume that segment
granularity is going to be changed if intervals
* are missing and force to use timeChunk lock.
- *
+ * <p>
* This variable is initialized in the constructor and used in {@link #run}
to log that timeChunk lock was enforced
* in the task logs.
*/
@@ -132,10 +132,10 @@ public class SinglePhaseSubTask extends
AbstractBatchIndexTask
this.ingestionSchema = ingestionSchema;
this.supervisorTaskId = supervisorTaskId;
this.missingIntervalsInOverwriteMode =
!ingestionSchema.getIOConfig().isAppendToExisting()
- && !ingestionSchema.getDataSchema()
-
.getGranularitySpec()
-
.bucketIntervals()
- .isPresent();
+ && ingestionSchema.getDataSchema()
+
.getGranularitySpec()
+ .inputIntervals()
+ .isEmpty();
if (missingIntervalsInOverwriteMode) {
addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
}
@@ -152,7 +152,7 @@ public class SinglePhaseSubTask extends
AbstractBatchIndexTask
{
return determineLockGranularityAndTryLock(
new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
- ingestionSchema.getDataSchema().getGranularitySpec()
+ ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
);
}
@@ -263,7 +263,7 @@ public class SinglePhaseSubTask extends
AbstractBatchIndexTask
* If the number of rows added to {@link BaseAppenderatorDriver} so far
exceeds {@link DynamicPartitionsSpec#maxTotalRows}
* </li>
* </ul>
- *
+ * <p>
* At the end of this method, all the remaining segments are published.
*
* @return true if generated segments are successfully published, otherwise
false
@@ -291,7 +291,7 @@ public class SinglePhaseSubTask extends
AbstractBatchIndexTask
final ParallelIndexTuningConfig tuningConfig =
ingestionSchema.getTuningConfig();
final DynamicPartitionsSpec partitionsSpec = (DynamicPartitionsSpec)
tuningConfig.getGivenOrDefaultPartitionsSpec();
final long pushTimeout = tuningConfig.getPushTimeout();
- final boolean explicitIntervals =
granularitySpec.bucketIntervals().isPresent();
+ final boolean explicitIntervals =
!granularitySpec.inputIntervals().isEmpty();
final SegmentAllocator segmentAllocator =
SegmentAllocators.forLinearPartitioning(
toolbox,
getId(),
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
index b2e1831..0f97749 100644
---
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
+++
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
@@ -21,29 +21,20 @@ package org.apache.druid.segment.indexing.granularity;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
-import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.java.util.common.guava.Comparators;
-import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
-import java.util.ArrayList;
import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-public class ArbitraryGranularitySpec implements GranularitySpec
+public class ArbitraryGranularitySpec extends BaseGranularitySpec
{
- private final TreeSet<Interval> intervals;
private final Granularity queryGranularity;
- private final Boolean rollup;
+ protected LookupIntervalBuckets lookupTableBucketByDateTime;
@JsonCreator
public ArbitraryGranularitySpec(
@@ -52,22 +43,15 @@ public class ArbitraryGranularitySpec implements
GranularitySpec
@JsonProperty("intervals") @Nullable List<Interval> inputIntervals
)
{
+ super(inputIntervals, rollup);
this.queryGranularity = queryGranularity == null ? Granularities.NONE :
queryGranularity;
- this.rollup = rollup == null ? Boolean.TRUE : rollup;
- this.intervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
- if (inputIntervals == null) {
- inputIntervals = new ArrayList<>();
- }
-
- // Insert all intervals
- intervals.addAll(inputIntervals);
+ lookupTableBucketByDateTime = new LookupIntervalBuckets(inputIntervals);
// Ensure intervals are non-overlapping (but they may abut each other)
- final PeekingIterator<Interval> intervalIterator =
Iterators.peekingIterator(intervals.iterator());
+ final PeekingIterator<Interval> intervalIterator =
Iterators.peekingIterator(sortedBucketIntervals().iterator());
while (intervalIterator.hasNext()) {
final Interval currentInterval = intervalIterator.next();
-
if (intervalIterator.hasNext()) {
final Interval nextInterval = intervalIterator.peek();
if (currentInterval.overlaps(nextInterval)) {
@@ -86,29 +70,9 @@ public class ArbitraryGranularitySpec implements
GranularitySpec
}
@Override
- @JsonProperty("intervals")
- public Optional<SortedSet<Interval>> bucketIntervals()
- {
- return Optional.of(intervals);
- }
-
- @Override
- public List<Interval> inputIntervals()
- {
- return ImmutableList.copyOf(intervals);
- }
-
- @Override
- public Optional<Interval> bucketInterval(DateTime dt)
+ public Iterable<Interval> sortedBucketIntervals()
{
- // First interval with start time ≤ dt
- final Interval interval = intervals.floor(new Interval(dt, DateTimes.MAX));
-
- if (interval != null && interval.contains(dt)) {
- return Optional.of(interval);
- } else {
- return Optional.absent();
- }
+ return () -> lookupTableBucketByDateTime.iterator();
}
@Override
@@ -118,13 +82,6 @@ public class ArbitraryGranularitySpec implements
GranularitySpec
}
@Override
- @JsonProperty("rollup")
- public boolean isRollup()
- {
- return rollup;
- }
-
- @Override
@JsonProperty("queryGranularity")
public Granularity getQueryGranularity()
{
@@ -143,7 +100,7 @@ public class ArbitraryGranularitySpec implements
GranularitySpec
ArbitraryGranularitySpec that = (ArbitraryGranularitySpec) o;
- if (!intervals.equals(that.intervals)) {
+ if (!inputIntervals().equals(that.inputIntervals())) {
return false;
}
if (!rollup.equals(that.rollup)) {
@@ -159,7 +116,7 @@ public class ArbitraryGranularitySpec implements
GranularitySpec
@Override
public int hashCode()
{
- int result = intervals.hashCode();
+ int result = inputIntervals().hashCode();
result = 31 * result + rollup.hashCode();
result = 31 * result + (queryGranularity != null ?
queryGranularity.hashCode() : 0);
return result;
@@ -169,7 +126,7 @@ public class ArbitraryGranularitySpec implements
GranularitySpec
public String toString()
{
return "ArbitraryGranularitySpec{" +
- "intervals=" + intervals +
+ "intervals=" + inputIntervals() +
", queryGranularity=" + queryGranularity +
", rollup=" + rollup +
'}';
@@ -180,4 +137,11 @@ public class ArbitraryGranularitySpec implements
GranularitySpec
{
return new ArbitraryGranularitySpec(queryGranularity, rollup,
inputIntervals);
}
+
+ @Override
+ protected LookupIntervalBuckets getLookupTableBuckets()
+ {
+ return lookupTableBucketByDateTime;
+ }
+
}
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
new file mode 100644
index 0000000..9ff114b
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing.granularity;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+
+abstract class BaseGranularitySpec implements GranularitySpec
+{
+ protected List<Interval> inputIntervals;
+ protected final Boolean rollup;
+
+ public BaseGranularitySpec(List<Interval> inputIntervals, Boolean rollup)
+ {
+ if (inputIntervals != null) {
+ this.inputIntervals = ImmutableList.copyOf(inputIntervals);
+ } else {
+ this.inputIntervals = Collections.emptyList();
+ }
+ this.rollup = rollup == null ? Boolean.TRUE : rollup;
+ }
+
+ @Override
+ @JsonProperty("intervals")
+ public List<Interval> inputIntervals()
+ {
+ return inputIntervals;
+ }
+
+ @Override
+ @JsonProperty("rollup")
+ public boolean isRollup()
+ {
+ return rollup;
+ }
+
+ @Override
+ public Optional<Interval> bucketInterval(DateTime dt)
+ {
+ return getLookupTableBuckets().bucketInterval(dt);
+ }
+
+ @Override
+ public TreeSet<Interval> materializedBucketIntervals()
+ {
+ return getLookupTableBuckets().materializedIntervals();
+ }
+
+ protected abstract LookupIntervalBuckets getLookupTableBuckets();
+
+ /**
+ * This is a helper class to facilitate sharing the code for
sortedBucketIntervals among
+ * the various GranularitySpec implementations. In particular, the
UniformGranularitySpec
+ * needs to avoid materializing the intervals when the need to traverse them
arises.
+ */
+ protected static class LookupIntervalBuckets
+ {
+ private final Iterable<Interval> intervalIterable;
+ private final TreeSet<Interval> intervals;
+
+ /**
+ * @param intervalIterable The intervals to materialize
+ */
+ public LookupIntervalBuckets(Iterable<Interval> intervalIterable)
+ {
+ this.intervalIterable = intervalIterable;
+ // The tree set will be materialized on demand (see below) to avoid
client code
+ // blowing up when constructing this data structure and when the
+ // number of intervals is very large...
+ this.intervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
+ }
+
+ /**
+ * Returns a bucket interval using a fast lookup into an efficient data
structure
+ * where all the intervals have been materialized
+ *
+ * @param dt The date time to lookup
+ * @return An Optional containing the interval for the given DateTime if
it exists
+ */
+ public Optional<Interval> bucketInterval(DateTime dt)
+ {
+ final Interval interval = materializedIntervals().floor(new Interval(dt,
DateTimes.MAX));
+ if (interval != null && interval.contains(dt)) {
+ return Optional.of(interval);
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ /**
+ * @return An iterator to traverse the materialized intervals. The
traversal will be done in
+ * order as dictated by Comparators.intervalsByStartThenEnd()
+ */
+ public Iterator<Interval> iterator()
+ {
+ return materializedIntervals().iterator();
+ }
+
+ /**
+ * Helper method to avoid collecting the intervals from the iterator
+ *
+ * @return The TreeSet of materialized intervals
+ */
+ public TreeSet<Interval> materializedIntervals()
+ {
+ if (intervalIterable != null && intervalIterable.iterator().hasNext() &&
intervals.isEmpty()) {
+ Iterators.addAll(intervals, intervalIterable.iterator());
+ }
+ return intervals;
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
index 9272f69..c3bb579 100644
---
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
+++
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
@@ -27,7 +27,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
-import java.util.SortedSet;
+import java.util.TreeSet;
/**
* Tells the indexer how to group events based on timestamp. The events may
then be further partitioned based
@@ -41,11 +41,13 @@ import java.util.SortedSet;
public interface GranularitySpec
{
/**
- * Set of all time groups, broken up on segment boundaries. Should be sorted
by interval start and non-overlapping.
+ * Iterable all time groups, broken up on segment boundaries. Should be
sorted by interval start and non-overlapping.
*
- * @return set of all time groups
+ * @return Iterable of all time groups
*/
- Optional<SortedSet<Interval>> bucketIntervals();
+ Iterable<Interval> sortedBucketIntervals();
+
+
/**
* Returns user provided intervals as-is state. used for configuring
granular path spec
@@ -58,11 +60,18 @@ public interface GranularitySpec
* Time-grouping interval corresponding to some instant, if any.
*
* @param dt instant to return time interval for
- *
* @return optional time interval
*/
Optional<Interval> bucketInterval(DateTime dt);
+ /**
+ * This is a helper method for areas of the code, not in the overlord, were
for performance
+ * reasons might need the materialized set of bucket intervals
+ * @return A fast lookup, ordered set, of the materialized bucket interval
+ *
+ */
+ TreeSet<Interval> materializedBucketIntervals();
+
Granularity getSegmentGranularity();
boolean isRollup();
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
index fae680a..5fe6a9e 100644
---
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
+++
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
@@ -21,28 +21,22 @@ package org.apache.druid.segment.indexing.granularity;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
-import org.joda.time.DateTime;
+import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.joda.time.Interval;
-import java.util.ArrayList;
import java.util.List;
-import java.util.SortedSet;
-public class UniformGranularitySpec implements GranularitySpec
+public class UniformGranularitySpec extends BaseGranularitySpec
{
private static final Granularity DEFAULT_SEGMENT_GRANULARITY =
Granularities.DAY;
private static final Granularity DEFAULT_QUERY_GRANULARITY =
Granularities.NONE;
private final Granularity segmentGranularity;
private final Granularity queryGranularity;
- private final Boolean rollup;
- private final List<Interval> inputIntervals;
- private final ArbitraryGranularitySpec wrappedSpec;
+ private final IntervalsByGranularity intervalsByGranularity;
+ protected LookupIntervalBuckets lookupTableBucketByDateTime;
@JsonCreator
public UniformGranularitySpec(
@@ -52,21 +46,11 @@ public class UniformGranularitySpec implements
GranularitySpec
@JsonProperty("intervals") List<Interval> inputIntervals
)
{
+ super(inputIntervals, rollup);
this.queryGranularity = queryGranularity == null ?
DEFAULT_QUERY_GRANULARITY : queryGranularity;
- this.rollup = rollup == null ? Boolean.TRUE : rollup;
this.segmentGranularity = segmentGranularity == null ?
DEFAULT_SEGMENT_GRANULARITY : segmentGranularity;
-
- if (inputIntervals != null) {
- List<Interval> granularIntervals = new ArrayList<>();
- for (Interval inputInterval : inputIntervals) {
- Iterables.addAll(granularIntervals,
this.segmentGranularity.getIterable(inputInterval));
- }
- this.inputIntervals = ImmutableList.copyOf(inputIntervals);
- this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity,
rollup, granularIntervals);
- } else {
- this.inputIntervals = null;
- this.wrappedSpec = null;
- }
+ intervalsByGranularity = new IntervalsByGranularity(this.inputIntervals,
segmentGranularity);
+ lookupTableBucketByDateTime = new
LookupIntervalBuckets(sortedBucketIntervals());
}
public UniformGranularitySpec(
@@ -79,29 +63,9 @@ public class UniformGranularitySpec implements
GranularitySpec
}
@Override
- public Optional<SortedSet<Interval>> bucketIntervals()
- {
- if (wrappedSpec == null) {
- return Optional.absent();
- } else {
- return wrappedSpec.bucketIntervals();
- }
- }
-
- @Override
- public List<Interval> inputIntervals()
+ public Iterable<Interval> sortedBucketIntervals()
{
- return inputIntervals == null ? ImmutableList.of() :
ImmutableList.copyOf(inputIntervals);
- }
-
- @Override
- public Optional<Interval> bucketInterval(DateTime dt)
- {
- if (wrappedSpec == null) {
- return Optional.absent();
- } else {
- return wrappedSpec.bucketInterval(dt);
- }
+ return () -> intervalsByGranularity.granularityIntervalsIterator();
}
@Override
@@ -112,25 +76,12 @@ public class UniformGranularitySpec implements
GranularitySpec
}
@Override
- @JsonProperty("rollup")
- public boolean isRollup()
- {
- return rollup;
- }
-
- @Override
@JsonProperty("queryGranularity")
public Granularity getQueryGranularity()
{
return queryGranularity;
}
- @JsonProperty("intervals")
- public Optional<List<Interval>> getIntervals()
- {
- return Optional.fromNullable(inputIntervals);
- }
-
@Override
public boolean equals(Object o)
{
@@ -149,14 +100,15 @@ public class UniformGranularitySpec implements
GranularitySpec
if (!queryGranularity.equals(that.queryGranularity)) {
return false;
}
- if (!rollup.equals(that.rollup)) {
+ if (isRollup() != that.isRollup()) {
return false;
}
if (inputIntervals != null ? !inputIntervals.equals(that.inputIntervals) :
that.inputIntervals != null) {
return false;
}
- return !(wrappedSpec != null ? !wrappedSpec.equals(that.wrappedSpec) :
that.wrappedSpec != null);
+
+ return true;
}
@@ -167,7 +119,6 @@ public class UniformGranularitySpec implements
GranularitySpec
result = 31 * result + queryGranularity.hashCode();
result = 31 * result + rollup.hashCode();
result = 31 * result + (inputIntervals != null ? inputIntervals.hashCode()
: 0);
- result = 31 * result + (wrappedSpec != null ? wrappedSpec.hashCode() : 0);
return result;
}
@@ -179,7 +130,6 @@ public class UniformGranularitySpec implements
GranularitySpec
", queryGranularity=" + queryGranularity +
", rollup=" + rollup +
", inputIntervals=" + inputIntervals +
- ", wrappedSpec=" + wrappedSpec +
'}';
}
@@ -188,4 +138,11 @@ public class UniformGranularitySpec implements
GranularitySpec
{
return new UniformGranularitySpec(segmentGranularity, queryGranularity,
rollup, inputIntervals);
}
+
+ @Override
+ protected LookupIntervalBuckets getLookupTableBuckets()
+ {
+ return lookupTableBucketByDateTime;
+ }
+
}
diff --git
a/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
b/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
index d7dff7a..a015422 100644
---
a/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
+++
b/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.indexing.granularity;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@@ -47,7 +48,8 @@ public class ArbitraryGranularityTest
Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
- ));
+ )
+ );
Assert.assertNotNull(spec.getQueryGranularity());
}
@@ -57,12 +59,13 @@ public class ArbitraryGranularityTest
final GranularitySpec spec = new ArbitraryGranularitySpec(
Granularities.NONE,
Lists.newArrayList(
- Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
- Intervals.of("2012-02-01T00Z/2012-03-01T00Z"),
- Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
- Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
- Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
- ));
+ Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
+ Intervals.of("2012-02-01T00Z/2012-03-01T00Z"),
+ Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
+ Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
+ Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
+ )
+ );
Assert.assertTrue(spec.isRollup());
@@ -74,7 +77,17 @@ public class ArbitraryGranularityTest
Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
Intervals.of("2012-02-01T00Z/2012-03-01T00Z")
),
- Lists.newArrayList(spec.bucketIntervals().get())
+ Lists.newArrayList(spec.sortedBucketIntervals())
+ );
+
+ Assert.assertEquals(
+ Optional.of(Intervals.of("2012-01-01T00Z/2012-01-03T00Z")),
+ spec.bucketInterval(DateTimes.of("2012-01-01T00Z"))
+ );
+
+ Assert.assertEquals(
+ Optional.of(Intervals.of("2012-01-08T00Z/2012-01-11T00Z")),
+ spec.bucketInterval(DateTimes.of("2012-01-08T00Z"))
);
Assert.assertEquals(
@@ -118,6 +131,7 @@ public class ArbitraryGranularityTest
Optional.absent(),
spec.bucketInterval(DateTimes.of("2012-01-05T00Z"))
);
+
}
@Test
@@ -187,10 +201,26 @@ public class ArbitraryGranularityTest
try {
final GranularitySpec rtSpec =
JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(spec),
GranularitySpec.class);
- Assert.assertEquals("Round-trip", spec.bucketIntervals(),
rtSpec.bucketIntervals());
+ Assert.assertEquals(
+ "Round-trip",
+ ImmutableList.copyOf(spec.sortedBucketIntervals()),
+ ImmutableList.copyOf(rtSpec.sortedBucketIntervals())
+ );
+ Assert.assertEquals(
+ "Round-trip",
+ ImmutableList.copyOf(spec.inputIntervals()),
+ ImmutableList.copyOf(rtSpec.inputIntervals())
+ );
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
+
+ @Test
+ public void testNullInputIntervals()
+ {
+ final GranularitySpec spec = new
ArbitraryGranularitySpec(Granularities.NONE, null);
+ Assert.assertFalse(spec.sortedBucketIntervals().iterator().hasNext());
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
b/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
index 402f7fd..27760f6 100644
---
a/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
+++
b/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
@@ -21,6 +21,8 @@ package org.apache.druid.segment.indexing.granularity;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@@ -34,8 +36,8 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.SortedSet;
public class UniformGranularityTest
{
@@ -44,20 +46,27 @@ public class UniformGranularityTest
@Test
public void testSimple()
{
+
+ final List<Interval> inputIntervals = Lists.newArrayList(
+ Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
+ Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
+ Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
+ Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
+ );
final GranularitySpec spec = new UniformGranularitySpec(
Granularities.DAY,
null,
- Lists.newArrayList(
- Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
- Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
- Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
- Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
- )
+ inputIntervals
);
Assert.assertTrue(spec.isRollup());
Assert.assertEquals(
+ inputIntervals,
+ Lists.newArrayList(spec.inputIntervals())
+ );
+
+ Assert.assertEquals(
Lists.newArrayList(
Intervals.of("2012-01-01T00Z/P1D"),
Intervals.of("2012-01-02T00Z/P1D"),
@@ -67,7 +76,28 @@ public class UniformGranularityTest
Intervals.of("2012-01-09T00Z/P1D"),
Intervals.of("2012-01-10T00Z/P1D")
),
- Lists.newArrayList(spec.bucketIntervals().get())
+ Lists.newArrayList(spec.sortedBucketIntervals())
+ );
+
+
+ Assert.assertEquals(
+ Optional.<Interval>absent(),
+ spec.bucketInterval(DateTimes.of("2011-01-12T00Z"))
+ );
+
+ Assert.assertEquals(
+ Optional.of(Intervals.of("2012-01-01T00Z/2012-01-02T00Z")),
+ spec.bucketInterval(DateTimes.of("2012-01-01T00Z"))
+ );
+
+ Assert.assertEquals(
+ Optional.of(Intervals.of("2012-01-10T00Z/2012-01-11T00Z")),
+ spec.bucketInterval(DateTimes.of("2012-01-10T00Z"))
+ );
+
+ Assert.assertEquals(
+ Optional.<Interval>absent(),
+ spec.bucketInterval(DateTimes.of("2012-01-12T00Z"))
);
Assert.assertEquals(
@@ -99,6 +129,7 @@ public class UniformGranularityTest
Optional.of(Intervals.of("2012-01-08T00Z/2012-01-09T00Z")),
spec.bucketInterval(DateTimes.of("2012-01-08T01Z"))
);
+
}
@Test
@@ -132,9 +163,9 @@ public class UniformGranularityTest
try {
final GranularitySpec rtSpec =
JOSN_MAPPER.readValue(JOSN_MAPPER.writeValueAsString(spec),
GranularitySpec.class);
Assert.assertEquals(
- "Round-trip bucketIntervals",
- spec.bucketIntervals(),
- rtSpec.bucketIntervals()
+ "Round-trip sortedBucketIntervals",
+ ImmutableList.copyOf(spec.sortedBucketIntervals()),
+ ImmutableList.copyOf(rtSpec.sortedBucketIntervals().iterator())
);
Assert.assertEquals(
"Round-trip granularity",
@@ -253,10 +284,9 @@ public class UniformGranularityTest
)
);
- Assert.assertTrue(spec.bucketIntervals().isPresent());
+ Assert.assertTrue(spec.sortedBucketIntervals().iterator().hasNext());
- final Optional<SortedSet<Interval>> sortedSetOptional =
spec.bucketIntervals();
- final SortedSet<Interval> intervals = sortedSetOptional.get();
+ final Iterable<Interval> intervals = spec.sortedBucketIntervals();
ArrayList<Long> actualIntervals = new ArrayList<>();
for (Interval interval : intervals) {
actualIntervals.add(interval.toDurationMillis());
@@ -279,6 +309,25 @@ public class UniformGranularityTest
Assert.assertEquals(expectedIntervals, actualIntervals);
}
+ @Test
+ public void
testUniformGranularitySpecWithLargeNumberOfIntervalsDoesNotBlowUp()
+ {
+ // just make sure that intervals for uniform spec are not materialized
(causing OOM) when created
+ final GranularitySpec spec = new UniformGranularitySpec(
+ Granularities.SECOND,
+ null,
+ Collections.singletonList(
+ Intervals.of("2012-01-01T00Z/P10Y")
+ )
+ );
+
+ Assert.assertTrue(spec != null);
+
+ int count = Iterators.size(spec.sortedBucketIntervals().iterator());
+ // account for three leap years...
+ Assert.assertEquals(3600 * 24 * 365 * 10 + 3 * 24 * 3600, count);
+ }
+
private void notEqualsCheck(GranularitySpec spec1, GranularitySpec spec2)
{
Assert.assertNotEquals(spec1, spec2);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]