imply-cheddar commented on code in PR #14708:
URL: https://github.com/apache/druid/pull/14708#discussion_r1282590490
##########
processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java:
##########
@@ -151,128 +185,80 @@ public Function<Result<TopNResultValue>,
Result<TopNResultValue>> makePreCompute
final MetricManipulationFn fn
)
{
+ //noinspection ObjectEquality
+ if (MetricManipulatorFns.deserializing() != fn) {
+ throw DruidException.defensive("This method can only be used to
deserialize.");
+ }
+
return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
- private String dimension = query.getDimensionSpec().getOutputName();
- private final List<PostAggregator> prunedAggs =
prunePostAggregators(query);
private final AggregatorFactory[] aggregatorFactories =
query.getAggregatorSpecs()
.toArray(new AggregatorFactory[0]);
private final String[] aggFactoryNames =
extractFactoryName(query.getAggregatorSpecs());
@Override
public Result<TopNResultValue> apply(Result<TopNResultValue> result)
{
- List<Map<String, Object>> serializedValues = Lists.newArrayList(
- Iterables.transform(
- result.getValue(),
- new Function<DimensionAndMetricValueExtractor, Map<String,
Object>>()
- {
- @Override
- public Map<String, Object>
apply(DimensionAndMetricValueExtractor input)
- {
- final Map<String, Object> values =
Maps.newHashMapWithExpectedSize(
- aggregatorFactories.length
- + prunedAggs.size()
- + 1
- );
-
- for (int i = 0; i < aggregatorFactories.length; ++i) {
- final String aggName = aggFactoryNames[i];
- values.put(aggName,
fn.manipulate(aggregatorFactories[i], input.getMetric(aggName)));
- }
+ final List<DimensionAndMetricValueExtractor> values =
result.getValue().getValue();
+ final List<DimensionAndMetricValueExtractor> newValues = new
ArrayList<>(values.size());
- for (PostAggregator postAgg : prunedAggs) {
- final String name = postAgg.getName();
- Object calculatedPostAgg = input.getMetric(name);
- if (calculatedPostAgg != null) {
- values.put(name, calculatedPostAgg);
- } else {
- values.put(name, postAgg.compute(values));
- }
- }
- values.put(dimension, input.getDimensionValue(dimension));
+ for (DimensionAndMetricValueExtractor input : values) {
+ final Map<String, Object> map = new
LinkedHashMap<>(input.getBaseObject());
- return values;
- }
- }
- )
- );
+ for (int i = 0; i < aggregatorFactories.length; ++i) {
+ final String aggName = aggFactoryNames[i];
+ map.put(aggName,
aggregatorFactories[i].deserialize(map.get(aggName)));
+ }
- return new Result<TopNResultValue>(
- result.getTimestamp(),
- new TopNResultValue(serializedValues)
- );
+ newValues.add(new DimensionAndMetricValueExtractor(map));
+ }
+
+ return new Result<>(result.getTimestamp(), new
TopNResultValue(newValues));
}
};
}
+ @SuppressWarnings("ObjectEquality")
@Override
public Function<Result<TopNResultValue>, Result<TopNResultValue>>
makePostComputeManipulatorFn(
- final TopNQuery query,
- final MetricManipulationFn fn
+ TopNQuery query,
+ MetricManipulationFn fn
)
{
- return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
- {
- private String dimension = query.getDimensionSpec().getOutputName();
- private final AggregatorFactory[] aggregatorFactories =
query.getAggregatorSpecs()
-
.toArray(new AggregatorFactory[0]);
- private final String[] aggFactoryNames =
extractFactoryName(query.getAggregatorSpecs());
- private final PostAggregator[] postAggregators =
query.getPostAggregatorSpecs().toArray(new PostAggregator[0]);
+ if (MetricManipulatorFns.identity() == fn) {
+ return result -> result;
+ }
- @Override
- public Result<TopNResultValue> apply(Result<TopNResultValue> result)
+ if (MetricManipulatorFns.finalizing() == fn) {
+ return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
- List<Map<String, Object>> serializedValues = Lists.newArrayList(
- Iterables.transform(
- result.getValue(),
- new Function<DimensionAndMetricValueExtractor, Map<String,
Object>>()
- {
- @Override
- public Map<String, Object>
apply(DimensionAndMetricValueExtractor input)
- {
- final Map<String, Object> values =
Maps.newHashMapWithExpectedSize(
- aggregatorFactories.length
- + query.getPostAggregatorSpecs().size()
- + 1
- );
-
- // Put non-finalized aggregators before post-aggregators.
- for (final String name : aggFactoryNames) {
- values.put(name, input.getMetric(name));
- }
+ private final AggregatorFactory[] aggregatorFactories =
query.getAggregatorSpecs()
+
.toArray(new AggregatorFactory[0]);
+ private final String[] aggFactoryNames =
extractFactoryName(query.getAggregatorSpecs());
- // Put dimension, post-aggregators might depend on it.
- values.put(dimension, input.getDimensionValue(dimension));
-
- // Put post-aggregators.
- for (PostAggregator postAgg : postAggregators) {
- Object calculatedPostAgg =
input.getMetric(postAgg.getName());
- if (calculatedPostAgg != null) {
- values.put(postAgg.getName(), calculatedPostAgg);
- } else {
- values.put(postAgg.getName(), postAgg.compute(values));
- }
- }
+ @Override
+ public Result<TopNResultValue> apply(Result<TopNResultValue> result)
+ {
+ final List<DimensionAndMetricValueExtractor> values =
result.getValue().getValue();
+ final List<DimensionAndMetricValueExtractor> newValues = new
ArrayList<>(values.size());
- // Put finalized aggregators now that post-aggregators are
done.
- for (int i = 0; i < aggFactoryNames.length; ++i) {
- final String name = aggFactoryNames[i];
- values.put(name, fn.manipulate(aggregatorFactories[i],
input.getMetric(name)));
- }
+ for (DimensionAndMetricValueExtractor input : values) {
+ final Map<String, Object> map = new
LinkedHashMap<>(input.getBaseObject());
- return values;
- }
- }
- )
- );
+ for (int i = 0; i < aggregatorFactories.length; ++i) {
+ final String aggName = aggFactoryNames[i];
+ map.put(aggName,
aggregatorFactories[i].finalizeComputation(map.get(aggName)));
+ }
- return new Result<>(
- result.getTimestamp(),
- new TopNResultValue(serializedValues)
- );
- }
- };
+ newValues.add(new DimensionAndMetricValueExtractor(map));
+ }
+
+ return new Result<>(result.getTimestamp(), new
TopNResultValue(newValues));
+ }
+ };
+ }
+
+ throw DruidException.defensive("This method can only be used to
finalize.");
Review Comment:
There are 2 different if statements before this (one for identity and one
for deserialization). An early return would require a negated OR expression
and I just prefer the positive `if` statements instead ('cause I'm gonna have
to do them anyway given that they are different methods)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]