rohangarg commented on code in PR #14708:
URL: https://github.com/apache/druid/pull/14708#discussion_r1281830258
##########
processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java:
##########
@@ -151,128 +185,80 @@ public Function<Result<TopNResultValue>,
Result<TopNResultValue>> makePreCompute
final MetricManipulationFn fn
)
{
+ //noinspection ObjectEquality
+ if (MetricManipulatorFns.deserializing() != fn) {
+ throw DruidException.defensive("This method can only be used to
deserialize.");
+ }
+
return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
- private String dimension = query.getDimensionSpec().getOutputName();
- private final List<PostAggregator> prunedAggs =
prunePostAggregators(query);
private final AggregatorFactory[] aggregatorFactories =
query.getAggregatorSpecs()
.toArray(new AggregatorFactory[0]);
private final String[] aggFactoryNames =
extractFactoryName(query.getAggregatorSpecs());
@Override
public Result<TopNResultValue> apply(Result<TopNResultValue> result)
{
- List<Map<String, Object>> serializedValues = Lists.newArrayList(
- Iterables.transform(
- result.getValue(),
- new Function<DimensionAndMetricValueExtractor, Map<String,
Object>>()
- {
- @Override
- public Map<String, Object>
apply(DimensionAndMetricValueExtractor input)
- {
- final Map<String, Object> values =
Maps.newHashMapWithExpectedSize(
- aggregatorFactories.length
- + prunedAggs.size()
- + 1
- );
-
- for (int i = 0; i < aggregatorFactories.length; ++i) {
- final String aggName = aggFactoryNames[i];
- values.put(aggName,
fn.manipulate(aggregatorFactories[i], input.getMetric(aggName)));
- }
+ final List<DimensionAndMetricValueExtractor> values =
result.getValue().getValue();
+ final List<DimensionAndMetricValueExtractor> newValues = new
ArrayList<>(values.size());
- for (PostAggregator postAgg : prunedAggs) {
- final String name = postAgg.getName();
- Object calculatedPostAgg = input.getMetric(name);
- if (calculatedPostAgg != null) {
- values.put(name, calculatedPostAgg);
- } else {
- values.put(name, postAgg.compute(values));
- }
- }
- values.put(dimension, input.getDimensionValue(dimension));
+ for (DimensionAndMetricValueExtractor input : values) {
+ final Map<String, Object> map = new
LinkedHashMap<>(input.getBaseObject());
- return values;
- }
- }
- )
- );
+ for (int i = 0; i < aggregatorFactories.length; ++i) {
+ final String aggName = aggFactoryNames[i];
+ map.put(aggName,
aggregatorFactories[i].deserialize(map.get(aggName)));
Review Comment:
should we call `fn.apply` here instead?
##########
processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java:
##########
@@ -151,128 +185,80 @@ public Function<Result<TopNResultValue>,
Result<TopNResultValue>> makePreCompute
final MetricManipulationFn fn
)
{
+ //noinspection ObjectEquality
+ if (MetricManipulatorFns.deserializing() != fn) {
+ throw DruidException.defensive("This method can only be used to
deserialize.");
+ }
+
return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
- private String dimension = query.getDimensionSpec().getOutputName();
- private final List<PostAggregator> prunedAggs =
prunePostAggregators(query);
private final AggregatorFactory[] aggregatorFactories =
query.getAggregatorSpecs()
.toArray(new AggregatorFactory[0]);
private final String[] aggFactoryNames =
extractFactoryName(query.getAggregatorSpecs());
@Override
public Result<TopNResultValue> apply(Result<TopNResultValue> result)
{
- List<Map<String, Object>> serializedValues = Lists.newArrayList(
- Iterables.transform(
- result.getValue(),
- new Function<DimensionAndMetricValueExtractor, Map<String,
Object>>()
- {
- @Override
- public Map<String, Object>
apply(DimensionAndMetricValueExtractor input)
- {
- final Map<String, Object> values =
Maps.newHashMapWithExpectedSize(
- aggregatorFactories.length
- + prunedAggs.size()
- + 1
- );
-
- for (int i = 0; i < aggregatorFactories.length; ++i) {
- final String aggName = aggFactoryNames[i];
- values.put(aggName,
fn.manipulate(aggregatorFactories[i], input.getMetric(aggName)));
- }
+ final List<DimensionAndMetricValueExtractor> values =
result.getValue().getValue();
+ final List<DimensionAndMetricValueExtractor> newValues = new
ArrayList<>(values.size());
- for (PostAggregator postAgg : prunedAggs) {
- final String name = postAgg.getName();
- Object calculatedPostAgg = input.getMetric(name);
- if (calculatedPostAgg != null) {
- values.put(name, calculatedPostAgg);
- } else {
- values.put(name, postAgg.compute(values));
- }
- }
- values.put(dimension, input.getDimensionValue(dimension));
+ for (DimensionAndMetricValueExtractor input : values) {
+ final Map<String, Object> map = new
LinkedHashMap<>(input.getBaseObject());
- return values;
- }
- }
- )
- );
+ for (int i = 0; i < aggregatorFactories.length; ++i) {
+ final String aggName = aggFactoryNames[i];
+ map.put(aggName,
aggregatorFactories[i].deserialize(map.get(aggName)));
+ }
- return new Result<TopNResultValue>(
- result.getTimestamp(),
- new TopNResultValue(serializedValues)
- );
+ newValues.add(new DimensionAndMetricValueExtractor(map));
+ }
+
+ return new Result<>(result.getTimestamp(), new
TopNResultValue(newValues));
}
};
}
+ @SuppressWarnings("ObjectEquality")
@Override
public Function<Result<TopNResultValue>, Result<TopNResultValue>>
makePostComputeManipulatorFn(
- final TopNQuery query,
- final MetricManipulationFn fn
+ TopNQuery query,
+ MetricManipulationFn fn
)
{
- return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
- {
- private String dimension = query.getDimensionSpec().getOutputName();
- private final AggregatorFactory[] aggregatorFactories =
query.getAggregatorSpecs()
-
.toArray(new AggregatorFactory[0]);
- private final String[] aggFactoryNames =
extractFactoryName(query.getAggregatorSpecs());
- private final PostAggregator[] postAggregators =
query.getPostAggregatorSpecs().toArray(new PostAggregator[0]);
+ if (MetricManipulatorFns.identity() == fn) {
+ return result -> result;
+ }
- @Override
- public Result<TopNResultValue> apply(Result<TopNResultValue> result)
+ if (MetricManipulatorFns.finalizing() == fn) {
+ return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
- List<Map<String, Object>> serializedValues = Lists.newArrayList(
- Iterables.transform(
- result.getValue(),
- new Function<DimensionAndMetricValueExtractor, Map<String,
Object>>()
- {
- @Override
- public Map<String, Object>
apply(DimensionAndMetricValueExtractor input)
- {
- final Map<String, Object> values =
Maps.newHashMapWithExpectedSize(
- aggregatorFactories.length
- + query.getPostAggregatorSpecs().size()
- + 1
- );
-
- // Put non-finalized aggregators before post-aggregators.
- for (final String name : aggFactoryNames) {
- values.put(name, input.getMetric(name));
- }
+ private final AggregatorFactory[] aggregatorFactories =
query.getAggregatorSpecs()
+
.toArray(new AggregatorFactory[0]);
+ private final String[] aggFactoryNames =
extractFactoryName(query.getAggregatorSpecs());
- // Put dimension, post-aggregators might depend on it.
- values.put(dimension, input.getDimensionValue(dimension));
-
- // Put post-aggregators.
- for (PostAggregator postAgg : postAggregators) {
- Object calculatedPostAgg =
input.getMetric(postAgg.getName());
- if (calculatedPostAgg != null) {
- values.put(postAgg.getName(), calculatedPostAgg);
- } else {
- values.put(postAgg.getName(), postAgg.compute(values));
- }
- }
+ @Override
+ public Result<TopNResultValue> apply(Result<TopNResultValue> result)
+ {
+ final List<DimensionAndMetricValueExtractor> values =
result.getValue().getValue();
+ final List<DimensionAndMetricValueExtractor> newValues = new
ArrayList<>(values.size());
- // Put finalized aggregators now that post-aggregators are
done.
- for (int i = 0; i < aggFactoryNames.length; ++i) {
- final String name = aggFactoryNames[i];
- values.put(name, fn.manipulate(aggregatorFactories[i],
input.getMetric(name)));
- }
+ for (DimensionAndMetricValueExtractor input : values) {
+ final Map<String, Object> map = new
LinkedHashMap<>(input.getBaseObject());
- return values;
- }
- }
- )
- );
+ for (int i = 0; i < aggregatorFactories.length; ++i) {
+ final String aggName = aggFactoryNames[i];
+ map.put(aggName,
aggregatorFactories[i].finalizeComputation(map.get(aggName)));
Review Comment:
same here - `fn.apply` would be better I think
##########
processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java:
##########
@@ -166,6 +166,10 @@ public class AggregatorUtil
*/
public static List<PostAggregator>
pruneDependentPostAgg(List<PostAggregator> postAggregatorList, String
postAggName)
Review Comment:
the documentation and var naming in this function is a bit confusing - since
it can be used to prune postAggs not needed for a topn metric. Do we want to
think that even topn metrics are sort of postAggs?
##########
processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java:
##########
@@ -151,128 +185,80 @@ public Function<Result<TopNResultValue>,
Result<TopNResultValue>> makePreCompute
final MetricManipulationFn fn
)
{
+ //noinspection ObjectEquality
+ if (MetricManipulatorFns.deserializing() != fn) {
+ throw DruidException.defensive("This method can only be used to
deserialize.");
+ }
+
return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
- private String dimension = query.getDimensionSpec().getOutputName();
- private final List<PostAggregator> prunedAggs =
prunePostAggregators(query);
private final AggregatorFactory[] aggregatorFactories =
query.getAggregatorSpecs()
.toArray(new AggregatorFactory[0]);
private final String[] aggFactoryNames =
extractFactoryName(query.getAggregatorSpecs());
@Override
public Result<TopNResultValue> apply(Result<TopNResultValue> result)
{
- List<Map<String, Object>> serializedValues = Lists.newArrayList(
- Iterables.transform(
- result.getValue(),
- new Function<DimensionAndMetricValueExtractor, Map<String,
Object>>()
- {
- @Override
- public Map<String, Object>
apply(DimensionAndMetricValueExtractor input)
- {
- final Map<String, Object> values =
Maps.newHashMapWithExpectedSize(
- aggregatorFactories.length
- + prunedAggs.size()
- + 1
- );
-
- for (int i = 0; i < aggregatorFactories.length; ++i) {
- final String aggName = aggFactoryNames[i];
- values.put(aggName,
fn.manipulate(aggregatorFactories[i], input.getMetric(aggName)));
- }
+ final List<DimensionAndMetricValueExtractor> values =
result.getValue().getValue();
+ final List<DimensionAndMetricValueExtractor> newValues = new
ArrayList<>(values.size());
- for (PostAggregator postAgg : prunedAggs) {
- final String name = postAgg.getName();
- Object calculatedPostAgg = input.getMetric(name);
- if (calculatedPostAgg != null) {
- values.put(name, calculatedPostAgg);
- } else {
- values.put(name, postAgg.compute(values));
- }
- }
- values.put(dimension, input.getDimensionValue(dimension));
+ for (DimensionAndMetricValueExtractor input : values) {
+ final Map<String, Object> map = new
LinkedHashMap<>(input.getBaseObject());
- return values;
- }
- }
- )
- );
+ for (int i = 0; i < aggregatorFactories.length; ++i) {
+ final String aggName = aggFactoryNames[i];
+ map.put(aggName,
aggregatorFactories[i].deserialize(map.get(aggName)));
+ }
- return new Result<TopNResultValue>(
- result.getTimestamp(),
- new TopNResultValue(serializedValues)
- );
+ newValues.add(new DimensionAndMetricValueExtractor(map));
+ }
+
+ return new Result<>(result.getTimestamp(), new
TopNResultValue(newValues));
}
};
}
+ @SuppressWarnings("ObjectEquality")
@Override
public Function<Result<TopNResultValue>, Result<TopNResultValue>>
makePostComputeManipulatorFn(
- final TopNQuery query,
- final MetricManipulationFn fn
+ TopNQuery query,
+ MetricManipulationFn fn
)
{
- return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
- {
- private String dimension = query.getDimensionSpec().getOutputName();
- private final AggregatorFactory[] aggregatorFactories =
query.getAggregatorSpecs()
-
.toArray(new AggregatorFactory[0]);
- private final String[] aggFactoryNames =
extractFactoryName(query.getAggregatorSpecs());
- private final PostAggregator[] postAggregators =
query.getPostAggregatorSpecs().toArray(new PostAggregator[0]);
+ if (MetricManipulatorFns.identity() == fn) {
+ return result -> result;
+ }
- @Override
- public Result<TopNResultValue> apply(Result<TopNResultValue> result)
+ if (MetricManipulatorFns.finalizing() == fn) {
+ return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
- List<Map<String, Object>> serializedValues = Lists.newArrayList(
- Iterables.transform(
- result.getValue(),
- new Function<DimensionAndMetricValueExtractor, Map<String,
Object>>()
- {
- @Override
- public Map<String, Object>
apply(DimensionAndMetricValueExtractor input)
- {
- final Map<String, Object> values =
Maps.newHashMapWithExpectedSize(
- aggregatorFactories.length
- + query.getPostAggregatorSpecs().size()
- + 1
- );
-
- // Put non-finalized aggregators before post-aggregators.
- for (final String name : aggFactoryNames) {
- values.put(name, input.getMetric(name));
- }
+ private final AggregatorFactory[] aggregatorFactories =
query.getAggregatorSpecs()
+
.toArray(new AggregatorFactory[0]);
+ private final String[] aggFactoryNames =
extractFactoryName(query.getAggregatorSpecs());
- // Put dimension, post-aggregators might depend on it.
- values.put(dimension, input.getDimensionValue(dimension));
-
- // Put post-aggregators.
- for (PostAggregator postAgg : postAggregators) {
- Object calculatedPostAgg =
input.getMetric(postAgg.getName());
- if (calculatedPostAgg != null) {
- values.put(postAgg.getName(), calculatedPostAgg);
- } else {
- values.put(postAgg.getName(), postAgg.compute(values));
- }
- }
+ @Override
+ public Result<TopNResultValue> apply(Result<TopNResultValue> result)
+ {
+ final List<DimensionAndMetricValueExtractor> values =
result.getValue().getValue();
+ final List<DimensionAndMetricValueExtractor> newValues = new
ArrayList<>(values.size());
- // Put finalized aggregators now that post-aggregators are
done.
- for (int i = 0; i < aggFactoryNames.length; ++i) {
- final String name = aggFactoryNames[i];
- values.put(name, fn.manipulate(aggregatorFactories[i],
input.getMetric(name)));
- }
+ for (DimensionAndMetricValueExtractor input : values) {
+ final Map<String, Object> map = new
LinkedHashMap<>(input.getBaseObject());
- return values;
- }
- }
- )
- );
+ for (int i = 0; i < aggregatorFactories.length; ++i) {
+ final String aggName = aggFactoryNames[i];
+ map.put(aggName,
aggregatorFactories[i].finalizeComputation(map.get(aggName)));
+ }
- return new Result<>(
- result.getTimestamp(),
- new TopNResultValue(serializedValues)
- );
- }
- };
+ newValues.add(new DimensionAndMetricValueExtractor(map));
+ }
+
+ return new Result<>(result.getTimestamp(), new
TopNResultValue(newValues));
+ }
+ };
+ }
+
+ throw DruidException.defensive("This method can only be used to
finalize.");
Review Comment:
could do an early return with this exception in the method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]