kfaraz commented on code in PR #13280:
URL: https://github.com/apache/druid/pull/13280#discussion_r1008612952
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -570,50 +574,38 @@ static List<ParallelIndexIngestionSpec>
createIngestionSchema(
final SegmentCacheManagerFactory segmentCacheManagerFactory,
final RetryPolicyFactory retryPolicyFactory,
final boolean dropExisting
- ) throws IOException, SegmentLoadingException
+ ) throws IOException
{
- NonnullPair<Map<DataSegment, File>, List<TimelineObjectHolder<String,
DataSegment>>> pair = prepareSegments(
+ final List<TimelineObjectHolder<String, DataSegment>> timelineSegments =
retrieveRelevantTimelineHolders(
Review Comment:
Thank you! The whole thing is much easier to read now after removing the
Pairs.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -1014,6 +897,254 @@ static DimensionSchema createDimensionSchema(
}
}
+ /**
+ * Class for fetching and analyzing existing segments in order to generate
reingestion specs.
+ */
+ static class ExistingSegmentAnalyzer
+ {
+ private final List<Pair<DataSegment,
Supplier<ResourceHolder<QueryableIndex>>>> segments;
+
+ private final boolean needRollup;
+ private final boolean needQueryGranularity;
+ private final boolean needDimensionsSpec;
+ private final boolean needMetricsSpec;
+
+ // For processRollup:
+ private boolean rollupIsValid = true;
+ private Boolean rollup;
+
+ // For processQueryGranularity:
+ private Granularity queryGranularity;
+
+ // For processDimensionsSpec:
+ private final BiMap<String, Integer> uniqueDims = HashBiMap.create();
+ private final Map<String, DimensionSchema> dimensionSchemaMap = new
HashMap<>();
+
+ // For processMetricsSpec:
+ private final Set<List<AggregatorFactory>> aggregatorFactoryLists = new
HashSet<>();
+
+ ExistingSegmentAnalyzer(
+ final Iterable<Pair<DataSegment,
Supplier<ResourceHolder<QueryableIndex>>>> segments,
+ final boolean needRollup,
+ final boolean needQueryGranularity,
+ final boolean needDimensionsSpec,
+ final boolean needMetricsSpec
+ )
+ {
+ this.segments = Lists.newArrayList(segments);
+ this.needRollup = needRollup;
+ this.needQueryGranularity = needQueryGranularity;
+ this.needDimensionsSpec = needDimensionsSpec;
+ this.needMetricsSpec = needMetricsSpec;
+ }
+
+ public void fetchAndProcessIfNeeded()
+ {
+ if (!needRollup && !needQueryGranularity && !needDimensionsSpec &&
!needMetricsSpec) {
+ // Nothing to do; short-circuit and don't fetch segments.
+ return;
+ }
+
+ sortSegmentsList();
+
+ for (Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>
segmentPair : segments) {
+ final DataSegment dataSegment = segmentPair.lhs;
+
+ try (final ResourceHolder<QueryableIndex> queryableIndexHolder =
segmentPair.rhs.get()) {
+ final QueryableIndex index = queryableIndexHolder.get();
+
+ if (index != null) { // Avoid tombstones (null QueryableIndex)
+ if (index.getMetadata() == null) {
+ throw new RE(
+ "Index metadata doesn't exist for segment [%s]. Try
providing explicit rollup, "
+ + "queryGranularity, dimensionsSpec, and metricsSpec.",
dataSegment.getId()
+ );
+ }
+
+ processRollup(index);
+ processQueryGranularity(index);
+ processDimensionsSpec(index);
+ processMetricsSpec(index);
+ }
+ }
+ }
+ }
+
+ public Boolean getRollup()
+ {
+ if (!needRollup) {
+ throw new ISE("Not computing rollup");
+ }
+
+ return rollup;
+ }
+
+ public Granularity getQueryGranularity()
+ {
+ if (!needQueryGranularity) {
+ throw new ISE("Not computing queryGranularity");
+ }
+
+ return queryGranularity;
+ }
+
+ public DimensionsSpec getDimensionsSpec()
+ {
+ if (!needDimensionsSpec) {
+ throw new ISE("Not computing dimensionsSpec");
+ }
+
+ final BiMap<Integer, String> orderedDims = uniqueDims.inverse();
+ final List<DimensionSchema> dimensionSchemas =
+ IntStream.range(0, orderedDims.size())
+ .mapToObj(i -> {
+ final String dimName = orderedDims.get(i);
+ return Preconditions.checkNotNull(
+ dimensionSchemaMap.get(dimName),
+ "Cannot find dimension[%s] from dimensionSchemaMap",
+ dimName
+ );
+ })
+ .collect(Collectors.toList());
+
+ return new DimensionsSpec(dimensionSchemas);
+ }
+
+ public AggregatorFactory[] getMetricsSpec()
+ {
+ if (!needMetricsSpec) {
+ throw new ISE("Not computing metricsSpec");
+ }
+
+ if (aggregatorFactoryLists.isEmpty()) {
+ return new AggregatorFactory[0];
+ }
+
+ final AggregatorFactory[] mergedAggregators =
AggregatorFactory.mergeAggregators(
+ aggregatorFactoryLists.stream()
+ .map(xs -> xs.toArray(new
AggregatorFactory[0]))
+ .collect(Collectors.toList())
+ );
+
+ if (mergedAggregators == null) {
+ throw new ISE(
+ "Failed to merge existing aggregators when generating metricsSpec;
"
+ + "try providing explicit metricsSpec"
+ );
+ }
+
+ return mergedAggregators;
+ }
+
+ /**
+ * Sort segments in order, such that we look at later segments prior to
earlier ones. Useful when analyzing
+ * dimensions, as it allows us to take the latest value we see, and
therefore prefer types from more recent
+ * segments, if there was a change.
+ */
+ private void sortSegmentsList()
+ {
+ segments.sort(
+ (o1, o2) ->
Comparators.intervalsByStartThenEnd().compare(o2.lhs.getInterval(),
o1.lhs.getInterval())
+ );
+ }
+
+ private void processRollup(final QueryableIndex index)
+ {
+ if (!needRollup) {
+ return;
+ }
+
+ // carry-overs (i.e. query granularity & rollup) are valid iff they are
the same in every segment:
+ // Pick rollup value if all segments being compacted have the same,
non-null, value otherwise set it to false
+ if (rollupIsValid) {
+ Boolean isRollup = index.getMetadata().isRollup();
+ if (isRollup == null) {
+ rollupIsValid = false;
+ rollup = false;
+ } else if (rollup == null) {
+ rollup = isRollup;
+ } else if (!isRollup.equals(rollup)) {
+ rollupIsValid = false;
+ rollup = false;
+ }
+ }
Review Comment:
This can probably be simplified by having a single `boolean rollup` member
field.
The field is initialized to `true`.
Then, this whole block becomes:
```suggestion
Boolean isSegmentRollup = index.getMetadata().isRollup();
rollup = rollup && isSegmentRollup != null &&
isSegmentRollup.equals(rollup)
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -1014,6 +897,254 @@ static DimensionSchema createDimensionSchema(
}
}
+ /**
+ * Class for fetching and analyzing existing segments in order to generate
reingestion specs.
+ */
+ static class ExistingSegmentAnalyzer
+ {
+ private final List<Pair<DataSegment,
Supplier<ResourceHolder<QueryableIndex>>>> segments;
+
+ private final boolean needRollup;
+ private final boolean needQueryGranularity;
+ private final boolean needDimensionsSpec;
+ private final boolean needMetricsSpec;
+
+ // For processRollup:
+ private boolean rollupIsValid = true;
+ private Boolean rollup;
+
+ // For processQueryGranularity:
+ private Granularity queryGranularity;
+
+ // For processDimensionsSpec:
+ private final BiMap<String, Integer> uniqueDims = HashBiMap.create();
+ private final Map<String, DimensionSchema> dimensionSchemaMap = new
HashMap<>();
+
+ // For processMetricsSpec:
+ private final Set<List<AggregatorFactory>> aggregatorFactoryLists = new
HashSet<>();
+
+ ExistingSegmentAnalyzer(
+ final Iterable<Pair<DataSegment,
Supplier<ResourceHolder<QueryableIndex>>>> segments,
+ final boolean needRollup,
+ final boolean needQueryGranularity,
+ final boolean needDimensionsSpec,
+ final boolean needMetricsSpec
+ )
+ {
+ this.segments = Lists.newArrayList(segments);
+ this.needRollup = needRollup;
+ this.needQueryGranularity = needQueryGranularity;
+ this.needDimensionsSpec = needDimensionsSpec;
+ this.needMetricsSpec = needMetricsSpec;
+ }
+
+ public void fetchAndProcessIfNeeded()
+ {
+ if (!needRollup && !needQueryGranularity && !needDimensionsSpec &&
!needMetricsSpec) {
+ // Nothing to do; short-circuit and don't fetch segments.
+ return;
+ }
+
+ sortSegmentsList();
+
+ for (Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>
segmentPair : segments) {
+ final DataSegment dataSegment = segmentPair.lhs;
+
+ try (final ResourceHolder<QueryableIndex> queryableIndexHolder =
segmentPair.rhs.get()) {
+ final QueryableIndex index = queryableIndexHolder.get();
+
+ if (index != null) { // Avoid tombstones (null QueryableIndex)
+ if (index.getMetadata() == null) {
+ throw new RE(
+ "Index metadata doesn't exist for segment [%s]. Try
providing explicit rollup, "
+ + "queryGranularity, dimensionsSpec, and metricsSpec.",
dataSegment.getId()
+ );
+ }
+
+ processRollup(index);
+ processQueryGranularity(index);
+ processDimensionsSpec(index);
+ processMetricsSpec(index);
+ }
+ }
+ }
+ }
+
+ public Boolean getRollup()
+ {
+ if (!needRollup) {
+ throw new ISE("Not computing rollup");
+ }
+
+ return rollup;
+ }
+
+ public Granularity getQueryGranularity()
+ {
+ if (!needQueryGranularity) {
+ throw new ISE("Not computing queryGranularity");
+ }
+
+ return queryGranularity;
+ }
+
+ public DimensionsSpec getDimensionsSpec()
+ {
+ if (!needDimensionsSpec) {
+ throw new ISE("Not computing dimensionsSpec");
+ }
+
+ final BiMap<Integer, String> orderedDims = uniqueDims.inverse();
+ final List<DimensionSchema> dimensionSchemas =
+ IntStream.range(0, orderedDims.size())
+ .mapToObj(i -> {
+ final String dimName = orderedDims.get(i);
+ return Preconditions.checkNotNull(
+ dimensionSchemaMap.get(dimName),
+ "Cannot find dimension[%s] from dimensionSchemaMap",
+ dimName
+ );
+ })
+ .collect(Collectors.toList());
+
+ return new DimensionsSpec(dimensionSchemas);
+ }
+
+ public AggregatorFactory[] getMetricsSpec()
+ {
+ if (!needMetricsSpec) {
+ throw new ISE("Not computing metricsSpec");
+ }
+
+ if (aggregatorFactoryLists.isEmpty()) {
+ return new AggregatorFactory[0];
+ }
+
+ final AggregatorFactory[] mergedAggregators =
AggregatorFactory.mergeAggregators(
+ aggregatorFactoryLists.stream()
+ .map(xs -> xs.toArray(new
AggregatorFactory[0]))
+ .collect(Collectors.toList())
+ );
+
+ if (mergedAggregators == null) {
+ throw new ISE(
+ "Failed to merge existing aggregators when generating metricsSpec;
"
+ + "try providing explicit metricsSpec"
+ );
+ }
+
+ return mergedAggregators;
+ }
+
+ /**
+ * Sort segments in order, such that we look at later segments prior to
earlier ones. Useful when analyzing
+ * dimensions, as it allows us to take the latest value we see, and
therefore prefer types from more recent
+ * segments, if there was a change.
+ */
+ private void sortSegmentsList()
Review Comment:
Nit:
```suggestion
private void sortSegmentsListNewestFirst()
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -1014,6 +897,254 @@ static DimensionSchema createDimensionSchema(
}
}
+ /**
+ * Class for fetching and analyzing existing segments in order to generate
reingestion specs.
+ */
+ static class ExistingSegmentAnalyzer
+ {
+ private final List<Pair<DataSegment,
Supplier<ResourceHolder<QueryableIndex>>>> segments;
+
+ private final boolean needRollup;
+ private final boolean needQueryGranularity;
+ private final boolean needDimensionsSpec;
+ private final boolean needMetricsSpec;
+
+ // For processRollup:
+ private boolean rollupIsValid = true;
+ private Boolean rollup;
+
+ // For processQueryGranularity:
+ private Granularity queryGranularity;
+
+ // For processDimensionsSpec:
+ private final BiMap<String, Integer> uniqueDims = HashBiMap.create();
+ private final Map<String, DimensionSchema> dimensionSchemaMap = new
HashMap<>();
+
+ // For processMetricsSpec:
+ private final Set<List<AggregatorFactory>> aggregatorFactoryLists = new
HashSet<>();
+
+ ExistingSegmentAnalyzer(
+ final Iterable<Pair<DataSegment,
Supplier<ResourceHolder<QueryableIndex>>>> segments,
+ final boolean needRollup,
+ final boolean needQueryGranularity,
+ final boolean needDimensionsSpec,
+ final boolean needMetricsSpec
+ )
+ {
+ this.segments = Lists.newArrayList(segments);
+ this.needRollup = needRollup;
+ this.needQueryGranularity = needQueryGranularity;
+ this.needDimensionsSpec = needDimensionsSpec;
+ this.needMetricsSpec = needMetricsSpec;
+ }
+
+ public void fetchAndProcessIfNeeded()
+ {
+ if (!needRollup && !needQueryGranularity && !needDimensionsSpec &&
!needMetricsSpec) {
+ // Nothing to do; short-circuit and don't fetch segments.
+ return;
+ }
+
+ sortSegmentsList();
+
+ for (Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>
segmentPair : segments) {
+ final DataSegment dataSegment = segmentPair.lhs;
+
+ try (final ResourceHolder<QueryableIndex> queryableIndexHolder =
segmentPair.rhs.get()) {
+ final QueryableIndex index = queryableIndexHolder.get();
+
+ if (index != null) { // Avoid tombstones (null QueryableIndex)
+ if (index.getMetadata() == null) {
+ throw new RE(
+ "Index metadata doesn't exist for segment [%s]. Try
providing explicit rollup, "
+ + "queryGranularity, dimensionsSpec, and metricsSpec.",
dataSegment.getId()
+ );
+ }
+
+ processRollup(index);
+ processQueryGranularity(index);
+ processDimensionsSpec(index);
+ processMetricsSpec(index);
+ }
+ }
+ }
+ }
+
+ public Boolean getRollup()
+ {
+ if (!needRollup) {
+ throw new ISE("Not computing rollup");
+ }
+
+ return rollup;
+ }
+
+ public Granularity getQueryGranularity()
+ {
+ if (!needQueryGranularity) {
+ throw new ISE("Not computing queryGranularity");
+ }
+
+ return queryGranularity;
+ }
+
+ public DimensionsSpec getDimensionsSpec()
+ {
+ if (!needDimensionsSpec) {
+ throw new ISE("Not computing dimensionsSpec");
+ }
+
+ final BiMap<Integer, String> orderedDims = uniqueDims.inverse();
+ final List<DimensionSchema> dimensionSchemas =
+ IntStream.range(0, orderedDims.size())
+ .mapToObj(i -> {
+ final String dimName = orderedDims.get(i);
+ return Preconditions.checkNotNull(
+ dimensionSchemaMap.get(dimName),
+ "Cannot find dimension[%s] from dimensionSchemaMap",
+ dimName
+ );
+ })
+ .collect(Collectors.toList());
+
+ return new DimensionsSpec(dimensionSchemas);
+ }
+
+ public AggregatorFactory[] getMetricsSpec()
+ {
+ if (!needMetricsSpec) {
+ throw new ISE("Not computing metricsSpec");
+ }
+
+ if (aggregatorFactoryLists.isEmpty()) {
+ return new AggregatorFactory[0];
+ }
+
+ final AggregatorFactory[] mergedAggregators =
AggregatorFactory.mergeAggregators(
+ aggregatorFactoryLists.stream()
+ .map(xs -> xs.toArray(new
AggregatorFactory[0]))
+ .collect(Collectors.toList())
+ );
+
+ if (mergedAggregators == null) {
+ throw new ISE(
+ "Failed to merge existing aggregators when generating metricsSpec;
"
+ + "try providing explicit metricsSpec"
+ );
+ }
+
+ return mergedAggregators;
+ }
+
+ /**
+ * Sort segments in order, such that we look at later segments prior to
earlier ones. Useful when analyzing
+ * dimensions, as it allows us to take the latest value we see, and
therefore prefer types from more recent
+ * segments, if there was a change.
+ */
+ private void sortSegmentsList()
+ {
+ segments.sort(
+ (o1, o2) ->
Comparators.intervalsByStartThenEnd().compare(o2.lhs.getInterval(),
o1.lhs.getInterval())
Review Comment:
Would this work?
```suggestion
Comparators.intervalsByStartThenEnd().reversed()
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -725,44 +728,45 @@ private static ParallelIndexIOConfig createIoConfig(
);
}
- private static NonnullPair<Map<DataSegment, File>,
List<TimelineObjectHolder<String, DataSegment>>> prepareSegments(
+ private static List<TimelineObjectHolder<String, DataSegment>>
retrieveRelevantTimelineHolders(
TaskToolbox toolbox,
SegmentProvider segmentProvider,
LockGranularity lockGranularityInUse
- ) throws IOException, SegmentLoadingException
+ ) throws IOException
{
final List<DataSegment> usedSegments =
segmentProvider.findSegments(toolbox.getTaskActionClient());
segmentProvider.checkSegments(lockGranularityInUse, usedSegments);
- final Map<DataSegment, File> segmentFileMap =
toolbox.fetchSegments(usedSegments);
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments =
VersionedIntervalTimeline
.forSegments(usedSegments)
.lookup(segmentProvider.interval);
- return new NonnullPair<>(segmentFileMap, timelineSegments);
+ return timelineSegments;
}
private static DataSchema createDataSchema(
String dataSource,
- List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
+ Interval totalInterval,
+ Iterable<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>>
segments,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable ClientCompactionTaskTransformSpec transformSpec,
@Nullable AggregatorFactory[] metricsSpec,
@Nonnull ClientCompactionTaskGranularitySpec granularitySpec
)
{
- // check index metadata &
- // Decide which values to propagate (i.e. carry over) for rollup &
queryGranularity
- final SettableSupplier<Boolean> rollup = new SettableSupplier<>();
- final SettableSupplier<Granularity> queryGranularity = new
SettableSupplier<>();
- decideRollupAndQueryGranularityCarryOver(rollup, queryGranularity,
queryableIndexAndSegments);
-
- final Interval totalInterval = JodaUtils.umbrellaInterval(
- queryableIndexAndSegments.stream().map(p ->
p.rhs.getInterval()).collect(Collectors.toList())
+ // Check index metadata & decide which values to propagate (i.e. carry
over) for rollup & queryGranularity
+ final ExistingSegmentAnalyzer existingSegmentAnalyzer = new
ExistingSegmentAnalyzer(
+ segments,
+ granularitySpec.isRollup() == null,
Review Comment:
I wonder if it would be cleaner to pass all these specs to the
`ExistingSegmentAnalyzer` and let it decide whether it needs the spec or not.
This way, only the `ExistingSegmentAnalyzer` would be doing the null checks.
But maybe this is better as `ExistingSegmentAnalyzer` only deals with the
downloaded segments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]