Repository: nifi Updated Branches: refs/heads/master 52d6b9cfa -> 9b461027a
NIFI-5188: DruidTranquilityController does not fully support Druid aggregator Rollback Druid 0.9.2 to 0.9.1 Fixed checkstyle error Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #2696 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9b461027 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9b461027 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9b461027 Branch: refs/heads/master Commit: 9b461027a4e9940f7ade40e9169fc926f9289eca Parents: 52d6b9c Author: Dongkyu Hwangbo <hwangb...@gmail.com> Authored: Mon May 14 16:33:49 2018 +0900 Committer: Matthew Burgess <mattyb...@apache.org> Committed: Fri Jun 15 09:52:14 2018 -0400 ---------------------------------------------------------------------- .../nifi-druid-controller-service/pom.xml | 12 ++- .../druid/DruidTranquilityController.java | 81 ++++++-------------- nifi-nar-bundles/nifi-druid-bundle/pom.xml | 2 +- 3 files changed, 34 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/9b461027/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml index 2f52691..139aad8 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml @@ -47,10 +47,20 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>io.druid.extensions</groupId> + <artifactId>druid-histogram</artifactId> + <version>${druid.version}</version> + </dependency> + <dependency> + <groupId>io.druid.extensions</groupId> + <artifactId>druid-datasketches</artifactId> + <version>${druid.version}</version> + </dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <version>1.7.0-SNAPSHOT</version> <scope>test</scope> </dependency> </dependencies> -</project> \ No newline at end of file +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/9b461027/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java index 158579a..5d617f7 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java @@ -24,11 +24,17 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Lists; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.AggregatorsModule; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.query.aggregation.datasketches.theta.SketchModule; +import io.druid.query.aggregation.histogram.ApproximateHistogramDruidModule; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -47,7 +53,6 @@ import org.apache.nifi.controller.api.druid.DruidTranquilityService; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; -import org.codehaus.jackson.map.ObjectMapper; import com.metamx.common.Granularity; import com.metamx.tranquility.beam.Beam; @@ -62,15 +67,7 @@ import com.metamx.tranquility.tranquilizer.Tranquilizer; import com.metamx.tranquility.typeclass.Timestamper; import io.druid.data.input.impl.TimestampSpec; -import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.CountAggregatorFactory; -import io.druid.query.aggregation.DoubleMaxAggregatorFactory; -import io.druid.query.aggregation.DoubleMinAggregatorFactory; -import io.druid.query.aggregation.DoubleSumAggregatorFactory; -import io.druid.query.aggregation.LongMaxAggregatorFactory; -import io.druid.query.aggregation.LongMinAggregatorFactory; -import io.druid.query.aggregation.LongSumAggregatorFactory; import org.joda.time.DateTime; import org.joda.time.Period; @@ -518,56 +515,22 @@ public class DruidTranquilityController extends AbstractControllerService implem } private List<AggregatorFactory> getAggregatorList(String aggregatorJSON) { - List<AggregatorFactory> aggregatorList = new LinkedList<>(); - List<Map<String, String>> aggregatorInfo = parseJsonString(aggregatorJSON); - for (Map<String, String> aggregator : aggregatorInfo) { - - if (aggregator.get("type").equalsIgnoreCase("count")) { - aggregatorList.add(getCountAggregator(aggregator)); - } else if (aggregator.get("type").equalsIgnoreCase("doublesum")) { - aggregatorList.add(getDoubleSumAggregator(aggregator)); - } else if (aggregator.get("type").equalsIgnoreCase("doublemax")) { - aggregatorList.add(getDoubleMaxAggregator(aggregator)); - } else if (aggregator.get("type").equalsIgnoreCase("doublemin")) { - aggregatorList.add(getDoubleMinAggregator(aggregator)); - } else if (aggregator.get("type").equalsIgnoreCase("longsum")) { - aggregatorList.add(getLongSumAggregator(aggregator)); - } else if (aggregator.get("type").equalsIgnoreCase("longmax")) { - aggregatorList.add(getLongMaxAggregator(aggregator)); - } else if (aggregator.get("type").equalsIgnoreCase("longmin")) { - aggregatorList.add(getLongMinAggregator(aggregator)); - } - } - - return aggregatorList; - } - - private AggregatorFactory getLongMinAggregator(Map<String, String> map) { - return new LongMinAggregatorFactory(map.get("name"), map.get("fieldName")); - } - - private AggregatorFactory getLongMaxAggregator(Map<String, String> map) { - return new LongMaxAggregatorFactory(map.get("name"), map.get("fieldName")); - } - - private AggregatorFactory getLongSumAggregator(Map<String, String> map) { - return new LongSumAggregatorFactory(map.get("name"), map.get("fieldName")); - } - - private AggregatorFactory getDoubleMinAggregator(Map<String, String> map) { - return new DoubleMinAggregatorFactory(map.get("name"), map.get("fieldName")); - } - - private AggregatorFactory getDoubleMaxAggregator(Map<String, String> map) { - return new DoubleMaxAggregatorFactory(map.get("name"), map.get("fieldName")); - } - - private AggregatorFactory getDoubleSumAggregator(Map<String, String> map) { - return new DoubleSumAggregatorFactory(map.get("name"), map.get("fieldName")); - } + ComponentLog log = getLogger(); + ObjectMapper mapper = new ObjectMapper(null); + mapper.registerModule(new AggregatorsModule()); + mapper.registerModules(Lists.newArrayList(new SketchModule().getJacksonModules())); + mapper.registerModules(Lists.newArrayList(new ApproximateHistogramDruidModule().getJacksonModules())); - private AggregatorFactory getCountAggregator(Map<String, String> map) { - return new CountAggregatorFactory(map.get("name")); + try { + return mapper.readValue( + aggregatorJSON, + new TypeReference<List<AggregatorFactory>>() { + } + ); + } catch (IOException e) { + log.error(e.getMessage(), e); + return null; + } } private Granularity getGranularity(String granularityString) { http://git-wip-us.apache.org/repos/asf/nifi/blob/9b461027/nifi-nar-bundles/nifi-druid-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/pom.xml index 6543e7f..3deee9d 100644 --- a/nifi-nar-bundles/nifi-druid-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-druid-bundle/pom.xml @@ -52,4 +52,4 @@ <module>nifi-druid-controller-service</module> <module>nifi-druid-processors</module> </modules> -</project> \ No newline at end of file +</project>