Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2310#discussion_r157704512 --- Diff: nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java --- @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.druid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.api.druid.DruidTranquilityService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.codehaus.jackson.map.ObjectMapper; + +import com.metamx.common.Granularity; +import com.metamx.tranquility.beam.Beam; +import com.metamx.tranquility.beam.ClusteredBeamTuning; +import com.metamx.tranquility.druid.DruidBeamConfig; +import com.metamx.tranquility.druid.DruidBeams; +import com.metamx.tranquility.druid.DruidDimensions; +import com.metamx.tranquility.druid.DruidEnvironment; +import com.metamx.tranquility.druid.DruidLocation; +import com.metamx.tranquility.druid.DruidRollup; +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import com.metamx.tranquility.typeclass.Timestamper; + +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.DoubleMaxAggregatorFactory; +import io.druid.query.aggregation.DoubleMinAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongMaxAggregatorFactory; +import io.druid.query.aggregation.LongMinAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import org.joda.time.DateTime; +import org.joda.time.Period; + + +@Tags({"Druid", "Timeseries", "OLAP", "ingest"}) +@CapabilityDescription("Asynchronously sends flowfiles to Druid Indexing Task using Tranquility API. " + + "If aggregation and roll-up of data is required, an Aggregator JSON descriptor needs to be provided." + + "Details on how describe aggregation using JSON can be found at: http://druid.io/docs/latest/querying/aggregations.html") +public class DruidTranquilityController extends AbstractControllerService implements DruidTranquilityService { + private final static String FIREHOSE_PATTERN = "druid:firehose:%s"; + + private final static AllowableValue PT1M = new AllowableValue("PT1M", "1 minute", "1 minute"); + private final static AllowableValue PT10M = new AllowableValue("PT10M", "10 minutes", "10 minutes"); + private final static AllowableValue PT60M = new AllowableValue("PT60M", "60 minutes", "1 hour"); + + private final static List<String> TIME_ORDINALS = Arrays.asList("SECOND", "MINUTE", "FIVE_MINUTE", "TEN_MINUTE", "FIFTEEN_MINUTE", "HOUR", "SIX_HOUR", "DAY", "WEEK", "MONTH", "YEAR"); + + private Tranquilizer tranquilizer = null; + private String transitUri = ""; + + public static final PropertyDescriptor DATASOURCE = new PropertyDescriptor.Builder() + .name("druid-cs-data-source") + .displayName("Druid Data Source") + .description("A data source is the Druid equivalent of a database table.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder() + .name("druid-cs-zk-connect-string") + .displayName("Zookeeper Connection String") + .description("A comma-separated list of host:port pairs, each corresponding to a ZooKeeper server. Ex: localhost:2181") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor ZOOKEEPER_RETRY_BASE_SLEEP_TIME = new PropertyDescriptor.Builder() + .name("druid-cs-zk-retry-base-sleep") + .displayName("Zookeeper Retry Base Sleep Time") + .description("When a connection to Zookeeper needs to be retried, this property specifies the amount of time (in milliseconds) to wait at first before retrying.") + .required(true) + .defaultValue("1000") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor ZOOKEEPER_RETRY_MAX_RETRIES = new PropertyDescriptor.Builder() + .name("druid-cs-zk-retry-max-retries") + .displayName("Zookeeper Retry Max Retries") + .description("When a connection to Zookeeper needs to be retried, this property specifies how many times to attempt reconnection.") + .required(true) + .defaultValue("20") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor ZOOKEEPER_RETRY_SLEEP_TIME = new PropertyDescriptor.Builder() + .name("druid-cs-zk-retry-sleep") + .displayName("Zookeeper Retry Sleep Time") + .description("When a connection to Zookeeper needs to be retried, this property specifies the amount of time to sleep (in milliseconds) between retries.") + .required(true) + .defaultValue("30000") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor DRUID_INDEX_SERVICE_PATH = new PropertyDescriptor.Builder() + .name("druid-cs-index-service-path") + .displayName("Index Service Path") + .description("Druid Index Service path as defined via the Druid Overlord druid.service property.") + .required(true) + .defaultValue("druid/overlord") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor DRUID_DISCOVERY_PATH = new PropertyDescriptor.Builder() + .name("druid-cs-discovery-path") + .displayName("Discovery Path") + .description("Druid Discovery Path as configured in Druid Common druid.discovery.curator.path property") + .required(true) + .defaultValue("/druid/discovery") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor CLUSTER_PARTITIONS = new PropertyDescriptor.Builder() + .name("druid-cs-cluster-partitions") + .displayName("Cluster Partitions") + .description("The number of partitions in the Druid cluster.") + .required(true) + .defaultValue("1") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLUSTER_REPLICATION = new PropertyDescriptor.Builder() + .name("druid-cs-cluster-replication") + .displayName("Cluster Replication") + .description("The replication factor the Druid cluster.") --- End diff -- The replication factor **of** the Druid cluster.
---