imply-cheddar commented on code in PR #14708:
URL: https://github.com/apache/druid/pull/14708#discussion_r1282590490


##########
processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java:
##########
@@ -151,128 +185,80 @@ public Function<Result<TopNResultValue>, 
Result<TopNResultValue>> makePreCompute
       final MetricManipulationFn fn
   )
   {
+    //noinspection ObjectEquality
+    if (MetricManipulatorFns.deserializing() != fn) {
+      throw DruidException.defensive("This method can only be used to 
deserialize.");
+    }
+
     return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
     {
-      private String dimension = query.getDimensionSpec().getOutputName();
-      private final List<PostAggregator> prunedAggs = 
prunePostAggregators(query);
       private final AggregatorFactory[] aggregatorFactories = 
query.getAggregatorSpecs()
                                                                    
.toArray(new AggregatorFactory[0]);
       private final String[] aggFactoryNames = 
extractFactoryName(query.getAggregatorSpecs());
 
       @Override
       public Result<TopNResultValue> apply(Result<TopNResultValue> result)
       {
-        List<Map<String, Object>> serializedValues = Lists.newArrayList(
-            Iterables.transform(
-                result.getValue(),
-                new Function<DimensionAndMetricValueExtractor, Map<String, 
Object>>()
-                {
-                  @Override
-                  public Map<String, Object> 
apply(DimensionAndMetricValueExtractor input)
-                  {
-                    final Map<String, Object> values = 
Maps.newHashMapWithExpectedSize(
-                        aggregatorFactories.length
-                        + prunedAggs.size()
-                        + 1
-                    );
-
-                    for (int i = 0; i < aggregatorFactories.length; ++i) {
-                      final String aggName = aggFactoryNames[i];
-                      values.put(aggName, 
fn.manipulate(aggregatorFactories[i], input.getMetric(aggName)));
-                    }
+        final List<DimensionAndMetricValueExtractor> values = 
result.getValue().getValue();
+        final List<DimensionAndMetricValueExtractor> newValues = new 
ArrayList<>(values.size());
 
-                    for (PostAggregator postAgg : prunedAggs) {
-                      final String name = postAgg.getName();
-                      Object calculatedPostAgg = input.getMetric(name);
-                      if (calculatedPostAgg != null) {
-                        values.put(name, calculatedPostAgg);
-                      } else {
-                        values.put(name, postAgg.compute(values));
-                      }
-                    }
-                    values.put(dimension, input.getDimensionValue(dimension));
+        for (DimensionAndMetricValueExtractor input : values) {
+          final Map<String, Object> map = new 
LinkedHashMap<>(input.getBaseObject());
 
-                    return values;
-                  }
-                }
-            )
-        );
+          for (int i = 0; i < aggregatorFactories.length; ++i) {
+            final String aggName = aggFactoryNames[i];
+            map.put(aggName, 
aggregatorFactories[i].deserialize(map.get(aggName)));
+          }
 
-        return new Result<TopNResultValue>(
-            result.getTimestamp(),
-            new TopNResultValue(serializedValues)
-        );
+          newValues.add(new DimensionAndMetricValueExtractor(map));
+        }
+
+        return new Result<>(result.getTimestamp(), new 
TopNResultValue(newValues));
       }
     };
   }
 
+  @SuppressWarnings("ObjectEquality")
   @Override
   public Function<Result<TopNResultValue>, Result<TopNResultValue>> 
makePostComputeManipulatorFn(
-      final TopNQuery query,
-      final MetricManipulationFn fn
+      TopNQuery query,
+      MetricManipulationFn fn
   )
   {
-    return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
-    {
-      private String dimension = query.getDimensionSpec().getOutputName();
-      private final AggregatorFactory[] aggregatorFactories = 
query.getAggregatorSpecs()
-                                                                   
.toArray(new AggregatorFactory[0]);
-      private final String[] aggFactoryNames = 
extractFactoryName(query.getAggregatorSpecs());
-      private final PostAggregator[] postAggregators = 
query.getPostAggregatorSpecs().toArray(new PostAggregator[0]);
+    if (MetricManipulatorFns.identity() == fn) {
+      return result -> result;
+    }
 
-      @Override
-      public Result<TopNResultValue> apply(Result<TopNResultValue> result)
+    if (MetricManipulatorFns.finalizing() == fn) {
+      return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
       {
-        List<Map<String, Object>> serializedValues = Lists.newArrayList(
-            Iterables.transform(
-                result.getValue(),
-                new Function<DimensionAndMetricValueExtractor, Map<String, 
Object>>()
-                {
-                  @Override
-                  public Map<String, Object> 
apply(DimensionAndMetricValueExtractor input)
-                  {
-                    final Map<String, Object> values = 
Maps.newHashMapWithExpectedSize(
-                        aggregatorFactories.length
-                        + query.getPostAggregatorSpecs().size()
-                        + 1
-                    );
-
-                    // Put non-finalized aggregators before post-aggregators.
-                    for (final String name : aggFactoryNames) {
-                      values.put(name, input.getMetric(name));
-                    }
+        private final AggregatorFactory[] aggregatorFactories = 
query.getAggregatorSpecs()
+                                                                     
.toArray(new AggregatorFactory[0]);
+        private final String[] aggFactoryNames = 
extractFactoryName(query.getAggregatorSpecs());
 
-                    // Put dimension, post-aggregators might depend on it.
-                    values.put(dimension, input.getDimensionValue(dimension));
-
-                    // Put post-aggregators.
-                    for (PostAggregator postAgg : postAggregators) {
-                      Object calculatedPostAgg = 
input.getMetric(postAgg.getName());
-                      if (calculatedPostAgg != null) {
-                        values.put(postAgg.getName(), calculatedPostAgg);
-                      } else {
-                        values.put(postAgg.getName(), postAgg.compute(values));
-                      }
-                    }
+        @Override
+        public Result<TopNResultValue> apply(Result<TopNResultValue> result)
+        {
+          final List<DimensionAndMetricValueExtractor> values = 
result.getValue().getValue();
+          final List<DimensionAndMetricValueExtractor> newValues = new 
ArrayList<>(values.size());
 
-                    // Put finalized aggregators now that post-aggregators are 
done.
-                    for (int i = 0; i < aggFactoryNames.length; ++i) {
-                      final String name = aggFactoryNames[i];
-                      values.put(name, fn.manipulate(aggregatorFactories[i], 
input.getMetric(name)));
-                    }
+          for (DimensionAndMetricValueExtractor input : values) {
+            final Map<String, Object> map = new 
LinkedHashMap<>(input.getBaseObject());
 
-                    return values;
-                  }
-                }
-            )
-        );
+            for (int i = 0; i < aggregatorFactories.length; ++i) {
+              final String aggName = aggFactoryNames[i];
+              map.put(aggName, 
aggregatorFactories[i].finalizeComputation(map.get(aggName)));
+            }
 
-        return new Result<>(
-            result.getTimestamp(),
-            new TopNResultValue(serializedValues)
-        );
-      }
-    };
+            newValues.add(new DimensionAndMetricValueExtractor(map));
+          }
+
+          return new Result<>(result.getTimestamp(), new 
TopNResultValue(newValues));
+        }
+      };
+    }
+
+    throw DruidException.defensive("This method can only be used to 
finalize.");

Review Comment:
   There are 2 different if statements before this (one for identity and one 
for deserialization).  An early return would require a negated OR expression 
and I just prefer the positive `if` statements instead ('cause I'm gonna have 
to do them anyway given that they are different methods)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to