gianm commented on code in PR #13280:
URL: https://github.com/apache/druid/pull/13280#discussion_r1013173438


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -1014,6 +897,254 @@ static DimensionSchema createDimensionSchema(
     }
   }
 
+  /**
+   * Class for fetching and analyzing existing segments in order to generate 
reingestion specs.
+   */
+  static class ExistingSegmentAnalyzer
+  {
+    private final List<Pair<DataSegment, 
Supplier<ResourceHolder<QueryableIndex>>>> segments;
+
+    private final boolean needRollup;
+    private final boolean needQueryGranularity;
+    private final boolean needDimensionsSpec;
+    private final boolean needMetricsSpec;
+
+    // For processRollup:
+    private boolean rollupIsValid = true;
+    private Boolean rollup;
+
+    // For processQueryGranularity:
+    private Granularity queryGranularity;
+
+    // For processDimensionsSpec:
+    private final BiMap<String, Integer> uniqueDims = HashBiMap.create();
+    private final Map<String, DimensionSchema> dimensionSchemaMap = new 
HashMap<>();
+
+    // For processMetricsSpec:
+    private final Set<List<AggregatorFactory>> aggregatorFactoryLists = new 
HashSet<>();
+
+    ExistingSegmentAnalyzer(
+        final Iterable<Pair<DataSegment, 
Supplier<ResourceHolder<QueryableIndex>>>> segments,
+        final boolean needRollup,
+        final boolean needQueryGranularity,
+        final boolean needDimensionsSpec,
+        final boolean needMetricsSpec
+    )
+    {
+      this.segments = Lists.newArrayList(segments);
+      this.needRollup = needRollup;
+      this.needQueryGranularity = needQueryGranularity;
+      this.needDimensionsSpec = needDimensionsSpec;
+      this.needMetricsSpec = needMetricsSpec;
+    }
+
+    public void fetchAndProcessIfNeeded()
+    {
+      if (!needRollup && !needQueryGranularity && !needDimensionsSpec && 
!needMetricsSpec) {
+        // Nothing to do; short-circuit and don't fetch segments.
+        return;
+      }
+
+      sortSegmentsList();
+
+      for (Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>> 
segmentPair : segments) {
+        final DataSegment dataSegment = segmentPair.lhs;
+
+        try (final ResourceHolder<QueryableIndex> queryableIndexHolder = 
segmentPair.rhs.get()) {
+          final QueryableIndex index = queryableIndexHolder.get();
+
+          if (index != null) { // Avoid tombstones (null QueryableIndex)
+            if (index.getMetadata() == null) {
+              throw new RE(
+                  "Index metadata doesn't exist for segment [%s]. Try 
providing explicit rollup, "
+                  + "queryGranularity, dimensionsSpec, and metricsSpec.", 
dataSegment.getId()
+              );
+            }
+
+            processRollup(index);
+            processQueryGranularity(index);
+            processDimensionsSpec(index);
+            processMetricsSpec(index);
+          }
+        }
+      }
+    }
+
+    public Boolean getRollup()
+    {
+      if (!needRollup) {
+        throw new ISE("Not computing rollup");
+      }
+
+      return rollup;
+    }
+
+    public Granularity getQueryGranularity()
+    {
+      if (!needQueryGranularity) {
+        throw new ISE("Not computing queryGranularity");
+      }
+
+      return queryGranularity;
+    }
+
+    public DimensionsSpec getDimensionsSpec()
+    {
+      if (!needDimensionsSpec) {
+        throw new ISE("Not computing dimensionsSpec");
+      }
+
+      final BiMap<Integer, String> orderedDims = uniqueDims.inverse();
+      final List<DimensionSchema> dimensionSchemas =
+          IntStream.range(0, orderedDims.size())
+                   .mapToObj(i -> {
+                     final String dimName = orderedDims.get(i);
+                     return Preconditions.checkNotNull(
+                         dimensionSchemaMap.get(dimName),
+                         "Cannot find dimension[%s] from dimensionSchemaMap",
+                         dimName
+                     );
+                   })
+                   .collect(Collectors.toList());
+
+      return new DimensionsSpec(dimensionSchemas);
+    }
+
+    public AggregatorFactory[] getMetricsSpec()
+    {
+      if (!needMetricsSpec) {
+        throw new ISE("Not computing metricsSpec");
+      }
+
+      if (aggregatorFactoryLists.isEmpty()) {
+        return new AggregatorFactory[0];
+      }
+
+      final AggregatorFactory[] mergedAggregators = 
AggregatorFactory.mergeAggregators(
+          aggregatorFactoryLists.stream()
+                                .map(xs -> xs.toArray(new 
AggregatorFactory[0]))
+                                .collect(Collectors.toList())
+      );
+
+      if (mergedAggregators == null) {
+        throw new ISE(
+            "Failed to merge existing aggregators when generating metricsSpec; 
"
+            + "try providing explicit metricsSpec"
+        );
+      }
+
+      return mergedAggregators;
+    }
+
+    /**
+     * Sort segments in order, such that we look at later segments prior to 
earlier ones. Useful when analyzing
+     * dimensions, as it allows us to take the latest value we see, and 
therefore prefer types from more recent
+     * segments, if there was a change.
+     */
+    private void sortSegmentsList()
+    {
+      segments.sort(
+          (o1, o2) -> 
Comparators.intervalsByStartThenEnd().compare(o2.lhs.getInterval(), 
o1.lhs.getInterval())

Review Comment:
   Something still needs to extract the `interval`. I changed it to this to try 
to improve clarity:
   
   ```
         segments.sort(
             Comparator.comparing(
                 o -> o.lhs.getInterval(),
                 Comparators.intervalsByStartThenEnd().reversed()
             )
         );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to