This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c16d9da35a Improve documentation for tombstone generation and minor
improvement (#13907)
c16d9da35a is described below
commit c16d9da35ae87a111a19728354b09a3b4610a107
Author: Laksh Singla <[email protected]>
AuthorDate: Fri Mar 10 06:59:51 2023 +0530
Improve documentation for tombstone generation and minor improvement
(#13907)
* As a follow up to #13893, this PR improves the comments added along with
examples for the code, as well as adds handling for an edge case where the
generated tombstone boundaries were overshooting the bounds of MIN_TIME (or
MAX_TIME).
---
.../task/batch/parallel/TombstoneHelper.java | 45 ++++++++++++++--------
.../task/batch/parallel/TombstoneHelperTest.java | 39 +++++++++++++++++++
2 files changed, 68 insertions(+), 16 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
index 0c915d0025..61766cab77 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
@@ -26,6 +26,7 @@ import
org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.indexing.DataSchema;
@@ -93,7 +94,7 @@ public class TombstoneHelper
List<Interval> retVal = new ArrayList<>();
GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
List<Interval> pushedSegmentsIntervals =
getCondensedPushedSegmentsIntervals(pushedSegments);
- List<Interval> intervalsForUsedSegments = getCondensedUsedIntervals(
+ List<Interval> intervalsForUsedSegments =
getExistingNonEmptyIntervalsOfDatasource(
dataSchema.getGranularitySpec().inputIntervals(),
dataSchema.getDataSource()
);
@@ -165,11 +166,12 @@ public class TombstoneHelper
}
/**
- * @param intervalsToDrop Empty intervals in the query that need to be
dropped. They should be aligned with the
- * replaceGranularity
+ * See the method body for an example and an indepth explanation as to how
the replace interval is created
+ * @param intervalsToDrop Empty intervals in the query that need to be
dropped. They should be aligned with the
+ * replaceGranularity
* @param intervalsToReplace Intervals in the query which are eligible for
replacement with new data.
* They should be aligned with the
replaceGranularity
- * @param dataSource Datasource on which the replace is to be performed
+ * @param dataSource Datasource on which the replace is to be
performed
* @param replaceGranularity Granularity of the replace query
* @return Intervals computed for the tombstones
* @throws IOException
@@ -182,7 +184,7 @@ public class TombstoneHelper
) throws IOException
{
Set<Interval> retVal = new HashSet<>();
- List<Interval> usedIntervals =
getCondensedUsedIntervals(intervalsToReplace, dataSource);
+ List<Interval> usedIntervals =
getExistingNonEmptyIntervalsOfDatasource(intervalsToReplace, dataSource);
for (Interval intervalToDrop : intervalsToDrop) {
for (Interval usedInterval : usedIntervals) {
@@ -194,22 +196,29 @@ public class TombstoneHelper
continue;
}
- // Overlap might not be aligned with the granularity if the used
interval is not aligned with the granularity
- // However we align the boundaries manually, in the following code.
+ // "overlap" might not be aligned with the if the used interval is not
aligned with the granularity of
+ // the REPLACE i.e. datasource's original granularity and replace's
granularity are different
+
+ // However, we align the boundaries of the overlap with the
replaceGranularity manually, in the following code.
- // If the start is aligned, then bucketStart is idempotent, else it
will return the latest timestamp less than
- // overlap.getStart() which aligns with the replace granularity. That
extra interval that we are including
- // before the overlap should be contained in intervalToDrop because
intervalToDrop is aligned by the
- // replaceGranularity, and the overlap's beginning would always be
later than intervalToDrop (trivially,
- // because its the overlap) and if bucketStart floors the overlap
beginning, it cannot floor it before
- // the intervalToDrop's start
DateTime alignedIntervalStart =
replaceGranularity.bucketStart(overlap.getStart());
+ long alignedIntervalStartMillis =
Math.max(alignedIntervalStart.getMillis(), JodaUtils.MIN_INSTANT);
+ // If the start is aligned, then 'bucketStart()' is unchanged.
+ // Else 'bucketStart()' will return the latest timestamp less than
overlap.getStart() which aligns with the REPLACE granularity.
+
+ // That extra interval that we are adding before the overlap should be
contained in 'intervalToDrop' because
+ // intervalToDrop is aligned by the replaceGranularity.
+ // If the drop's interval is n, then the extra interval would start
from n + 1 (where 1 denotes the replaceGranularity)
+ // The overlap's beginning would always be later than intervalToDrop
(trivially,
+ // because it is the overlap) and if bucketStart floors the overlap
beginning, it cannot floor it before
+ // the intervalToDrop's start
// For example, if the replace granularity is DAY, intervalsToReplace
are 20/02/2023 - 24/02/2023 (always
// aligned with the replaceGranularity), intervalsToDrop are
22/02/2023 - 24/02/2023 (they must also be aligned with the replaceGranularity)
// If the relevant usedIntervals for the datasource are from
22/02/2023 01:00:00 - 23/02/2023 02:00:00, then
// the overlap would be 22/02/2023 01:00:00 - 23/02/2023 02:00:00.
When iterating over the overlap we will get
- // the intervals from 22/02/2023 - 23/02/2023, and 23/02/2023 -
24/02/2023
+ // the intervals from 22/02/2023 01:00:00 - 23/02/2023 02:00:00. After
aligning it would become
+ // 22/02/2023T00:00:00Z - 23/02/2023T23:59:59Z
// If the end is aligned, then we do not alter it, else we align the
end by geting the earliest time later
// than the overlap's end which aligns with the replace granularity.
Using the above-mentioned logic for the
@@ -220,7 +229,8 @@ public class TombstoneHelper
} else {
alignedIntervalEnd = replaceGranularity.bucketEnd(overlap.getEnd());
}
- Interval alignedTombstoneInterval = new Interval(alignedIntervalStart,
alignedIntervalEnd);
+ long alignedIntervalEndMillis =
Math.min(alignedIntervalEnd.getMillis(), JodaUtils.MAX_INSTANT);
+ Interval alignedTombstoneInterval =
Intervals.utc(alignedIntervalStartMillis, alignedIntervalEndMillis);
retVal.add(alignedTombstoneInterval);
}
@@ -259,13 +269,16 @@ public class TombstoneHelper
/**
* Helper method to prune required tombstones. Only tombstones that cover
used intervals will be created
* since those that not cover used intervals will be redundant.
+ * Example:
+ * For a datasource having segments for 2020-01-01/2020-12-31 and
2022-01-01/2022-12-31, this method would return
+ * the segment 2020-01-01/2020-12-31 if the input intervals asked for the
segment between 2019 and 2021.
*
* @param inputIntervals Intervals corresponding to the task
* @param dataSource Datasource corresponding to the task
* @return Intervals corresponding to used segments that overlap with any of
the spec's input intervals
* @throws IOException If used segments cannot be retrieved
*/
- private List<Interval> getCondensedUsedIntervals(
+ private List<Interval> getExistingNonEmptyIntervalsOfDatasource(
List<Interval> inputIntervals,
String dataSource
) throws IOException
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java
index c5977efaa8..0ba6ad9fd1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.indexing.DataSchema;
@@ -369,6 +370,44 @@ public class TombstoneHelperTest
Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")),
tombstoneIntervals);
}
+ @Test
+ public void testTombstoneIntervalsCreatedForReplaceWhenReplaceAll() throws
IOException
+ {
+ Interval usedInterval = Intervals.ETERNITY;
+ Interval replaceInterval = Intervals.ETERNITY;
+ List<Interval> intervalsToDrop = ImmutableList.of(
+ Intervals.utc(JodaUtils.MIN_INSTANT, 10000),
+ Intervals.utc(100000, JodaUtils.MAX_INSTANT)
+ );
+ Granularity replaceGranularity = Granularities.DAY;
+
+ DataSegment existingUsedSegment =
+ DataSegment.builder()
+ .dataSource("test")
+ .interval(usedInterval)
+ .version("oldVersion")
+ .size(100)
+ .build();
+ Assert.assertFalse(existingUsedSegment.isTombstone());
+ Mockito.when(taskActionClient.submit(any(TaskAction.class)))
+ .thenReturn(Collections.singletonList(existingUsedSegment));
+ TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
+
+ Set<Interval> tombstoneIntervals =
tombstoneHelper.computeTombstoneIntervalsForReplace(
+ intervalsToDrop,
+ ImmutableList.of(replaceInterval),
+ "test",
+ replaceGranularity
+ );
+ Assert.assertEquals(
+ ImmutableSet.of(
+
Intervals.of("-146136543-09-08T08:23:32.096Z/1970-01-02T00:00:00.000Z"),
+
Intervals.of("1970-01-01T00:00:00.000Z/146140482-04-24T15:36:27.903Z")
+ ),
+ tombstoneIntervals
+ );
+ }
+
@Test
public void testTombstoneSegmentsForReplaceWhenLockRevoked() throws
IOException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]