gianm commented on code in PR #13280:
URL: https://github.com/apache/druid/pull/13280#discussion_r1013170650
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -1014,6 +897,254 @@ static DimensionSchema createDimensionSchema(
}
}
+ /**
+ * Class for fetching and analyzing existing segments in order to generate
reingestion specs.
+ */
+ static class ExistingSegmentAnalyzer
+ {
+ private final List<Pair<DataSegment,
Supplier<ResourceHolder<QueryableIndex>>>> segments;
+
+ private final boolean needRollup;
+ private final boolean needQueryGranularity;
+ private final boolean needDimensionsSpec;
+ private final boolean needMetricsSpec;
+
+ // For processRollup:
+ private boolean rollupIsValid = true;
+ private Boolean rollup;
+
+ // For processQueryGranularity:
+ private Granularity queryGranularity;
+
+ // For processDimensionsSpec:
+ private final BiMap<String, Integer> uniqueDims = HashBiMap.create();
+ private final Map<String, DimensionSchema> dimensionSchemaMap = new
HashMap<>();
+
+ // For processMetricsSpec:
+ private final Set<List<AggregatorFactory>> aggregatorFactoryLists = new
HashSet<>();
+
+ ExistingSegmentAnalyzer(
+ final Iterable<Pair<DataSegment,
Supplier<ResourceHolder<QueryableIndex>>>> segments,
+ final boolean needRollup,
+ final boolean needQueryGranularity,
+ final boolean needDimensionsSpec,
+ final boolean needMetricsSpec
+ )
+ {
+ this.segments = Lists.newArrayList(segments);
+ this.needRollup = needRollup;
+ this.needQueryGranularity = needQueryGranularity;
+ this.needDimensionsSpec = needDimensionsSpec;
+ this.needMetricsSpec = needMetricsSpec;
+ }
+
+ public void fetchAndProcessIfNeeded()
+ {
+ if (!needRollup && !needQueryGranularity && !needDimensionsSpec &&
!needMetricsSpec) {
+ // Nothing to do; short-circuit and don't fetch segments.
+ return;
+ }
+
+ sortSegmentsList();
+
+ for (Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>
segmentPair : segments) {
+ final DataSegment dataSegment = segmentPair.lhs;
+
+ try (final ResourceHolder<QueryableIndex> queryableIndexHolder =
segmentPair.rhs.get()) {
+ final QueryableIndex index = queryableIndexHolder.get();
+
+ if (index != null) { // Avoid tombstones (null QueryableIndex)
+ if (index.getMetadata() == null) {
+ throw new RE(
+ "Index metadata doesn't exist for segment [%s]. Try
providing explicit rollup, "
+ + "queryGranularity, dimensionsSpec, and metricsSpec.",
dataSegment.getId()
+ );
+ }
+
+ processRollup(index);
+ processQueryGranularity(index);
+ processDimensionsSpec(index);
+ processMetricsSpec(index);
+ }
+ }
+ }
+ }
+
+ public Boolean getRollup()
+ {
+ if (!needRollup) {
+ throw new ISE("Not computing rollup");
+ }
+
+ return rollup;
+ }
+
+ public Granularity getQueryGranularity()
+ {
+ if (!needQueryGranularity) {
+ throw new ISE("Not computing queryGranularity");
+ }
+
+ return queryGranularity;
+ }
+
+ public DimensionsSpec getDimensionsSpec()
+ {
+ if (!needDimensionsSpec) {
+ throw new ISE("Not computing dimensionsSpec");
+ }
+
+ final BiMap<Integer, String> orderedDims = uniqueDims.inverse();
+ final List<DimensionSchema> dimensionSchemas =
+ IntStream.range(0, orderedDims.size())
+ .mapToObj(i -> {
+ final String dimName = orderedDims.get(i);
+ return Preconditions.checkNotNull(
+ dimensionSchemaMap.get(dimName),
+ "Cannot find dimension[%s] from dimensionSchemaMap",
+ dimName
+ );
+ })
+ .collect(Collectors.toList());
+
+ return new DimensionsSpec(dimensionSchemas);
+ }
+
+ public AggregatorFactory[] getMetricsSpec()
+ {
+ if (!needMetricsSpec) {
+ throw new ISE("Not computing metricsSpec");
+ }
+
+ if (aggregatorFactoryLists.isEmpty()) {
+ return new AggregatorFactory[0];
+ }
+
+ final AggregatorFactory[] mergedAggregators =
AggregatorFactory.mergeAggregators(
+ aggregatorFactoryLists.stream()
+ .map(xs -> xs.toArray(new
AggregatorFactory[0]))
+ .collect(Collectors.toList())
+ );
+
+ if (mergedAggregators == null) {
+ throw new ISE(
+ "Failed to merge existing aggregators when generating metricsSpec;
"
+ + "try providing explicit metricsSpec"
+ );
+ }
+
+ return mergedAggregators;
+ }
+
+ /**
+ * Sort segments in order, such that we look at later segments prior to
earlier ones. Useful when analyzing
+ * dimensions, as it allows us to take the latest value we see, and
therefore prefer types from more recent
+ * segments, if there was a change.
+ */
+ private void sortSegmentsList()
+ {
+ segments.sort(
+ (o1, o2) ->
Comparators.intervalsByStartThenEnd().compare(o2.lhs.getInterval(),
o1.lhs.getInterval())
+ );
+ }
+
+ private void processRollup(final QueryableIndex index)
+ {
+ if (!needRollup) {
+ return;
+ }
+
+ // carry-overs (i.e. query granularity & rollup) are valid iff they are
the same in every segment:
+ // Pick rollup value if all segments being compacted have the same,
non-null, value otherwise set it to false
+ if (rollupIsValid) {
+ Boolean isRollup = index.getMetadata().isRollup();
+ if (isRollup == null) {
+ rollupIsValid = false;
+ rollup = false;
+ } else if (rollup == null) {
+ rollup = isRollup;
+ } else if (!isRollup.equals(rollup)) {
+ rollupIsValid = false;
+ rollup = false;
+ }
+ }
Review Comment:
Good idea. Updated to similar but equivalent:
```
final Boolean isIndexRollup = index.getMetadata().isRollup();
rollup = rollup && Boolean.valueOf(true).equals(isIndexRollup);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]