[
https://issues.apache.org/jira/browse/NIFI-4428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16296557#comment-16296557
]
ASF GitHub Bot commented on NIFI-4428:
--------------------------------------
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2310#discussion_r157704512
--- Diff:
nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
---
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.druid;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.api.druid.DruidTranquilityService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.metamx.common.Granularity;
+import com.metamx.tranquility.beam.Beam;
+import com.metamx.tranquility.beam.ClusteredBeamTuning;
+import com.metamx.tranquility.druid.DruidBeamConfig;
+import com.metamx.tranquility.druid.DruidBeams;
+import com.metamx.tranquility.druid.DruidDimensions;
+import com.metamx.tranquility.druid.DruidEnvironment;
+import com.metamx.tranquility.druid.DruidLocation;
+import com.metamx.tranquility.druid.DruidRollup;
+import com.metamx.tranquility.tranquilizer.Tranquilizer;
+import com.metamx.tranquility.typeclass.Timestamper;
+
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.granularity.QueryGranularity;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
+import io.druid.query.aggregation.DoubleMinAggregatorFactory;
+import io.druid.query.aggregation.DoubleSumAggregatorFactory;
+import io.druid.query.aggregation.LongMaxAggregatorFactory;
+import io.druid.query.aggregation.LongMinAggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+
+@Tags({"Druid", "Timeseries", "OLAP", "ingest"})
+@CapabilityDescription("Asynchronously sends flowfiles to Druid Indexing
Task using Tranquility API. "
+ + "If aggregation and roll-up of data is required, an Aggregator
JSON descriptor needs to be provided."
+ + "Details on how describe aggregation using JSON can be found at:
http://druid.io/docs/latest/querying/aggregations.html")
+public class DruidTranquilityController extends AbstractControllerService
implements DruidTranquilityService {
+ private final static String FIREHOSE_PATTERN = "druid:firehose:%s";
+
+ private final static AllowableValue PT1M = new AllowableValue("PT1M",
"1 minute", "1 minute");
+ private final static AllowableValue PT10M = new
AllowableValue("PT10M", "10 minutes", "10 minutes");
+ private final static AllowableValue PT60M = new
AllowableValue("PT60M", "60 minutes", "1 hour");
+
+ private final static List<String> TIME_ORDINALS =
Arrays.asList("SECOND", "MINUTE", "FIVE_MINUTE", "TEN_MINUTE",
"FIFTEEN_MINUTE", "HOUR", "SIX_HOUR", "DAY", "WEEK", "MONTH", "YEAR");
+
+ private Tranquilizer tranquilizer = null;
+ private String transitUri = "";
+
+ public static final PropertyDescriptor DATASOURCE = new
PropertyDescriptor.Builder()
+ .name("druid-cs-data-source")
+ .displayName("Druid Data Source")
+ .description("A data source is the Druid equivalent of a
database table.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING =
new PropertyDescriptor.Builder()
+ .name("druid-cs-zk-connect-string")
+ .displayName("Zookeeper Connection String")
+ .description("A comma-separated list of host:port pairs, each
corresponding to a ZooKeeper server. Ex: localhost:2181")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor ZOOKEEPER_RETRY_BASE_SLEEP_TIME
= new PropertyDescriptor.Builder()
+ .name("druid-cs-zk-retry-base-sleep")
+ .displayName("Zookeeper Retry Base Sleep Time")
+ .description("When a connection to Zookeeper needs to be
retried, this property specifies the amount of time (in milliseconds) to wait
at first before retrying.")
+ .required(true)
+ .defaultValue("1000")
+ .expressionLanguageSupported(true)
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor ZOOKEEPER_RETRY_MAX_RETRIES =
new PropertyDescriptor.Builder()
+ .name("druid-cs-zk-retry-max-retries")
+ .displayName("Zookeeper Retry Max Retries")
+ .description("When a connection to Zookeeper needs to be
retried, this property specifies how many times to attempt reconnection.")
+ .required(true)
+ .defaultValue("20")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor ZOOKEEPER_RETRY_SLEEP_TIME =
new PropertyDescriptor.Builder()
+ .name("druid-cs-zk-retry-sleep")
+ .displayName("Zookeeper Retry Sleep Time")
+ .description("When a connection to Zookeeper needs to be
retried, this property specifies the amount of time to sleep (in milliseconds)
between retries.")
+ .required(true)
+ .defaultValue("30000")
+ .expressionLanguageSupported(true)
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor DRUID_INDEX_SERVICE_PATH = new
PropertyDescriptor.Builder()
+ .name("druid-cs-index-service-path")
+ .displayName("Index Service Path")
+ .description("Druid Index Service path as defined via the
Druid Overlord druid.service property.")
+ .required(true)
+ .defaultValue("druid/overlord")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor DRUID_DISCOVERY_PATH = new
PropertyDescriptor.Builder()
+ .name("druid-cs-discovery-path")
+ .displayName("Discovery Path")
+ .description("Druid Discovery Path as configured in Druid
Common druid.discovery.curator.path property")
+ .required(true)
+ .defaultValue("/druid/discovery")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor CLUSTER_PARTITIONS = new
PropertyDescriptor.Builder()
+ .name("druid-cs-cluster-partitions")
+ .displayName("Cluster Partitions")
+ .description("The number of partitions in the Druid cluster.")
+ .required(true)
+ .defaultValue("1")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CLUSTER_REPLICATION = new
PropertyDescriptor.Builder()
+ .name("druid-cs-cluster-replication")
+ .displayName("Cluster Replication")
+ .description("The replication factor the Druid cluster.")
--- End diff --
The replication factor **of** the Druid cluster.
> Implement PutDruid Processor and Controller
> -------------------------------------------
>
> Key: NIFI-4428
> URL: https://issues.apache.org/jira/browse/NIFI-4428
> Project: Apache NiFi
> Issue Type: New Feature
> Affects Versions: 1.3.0
> Reporter: Vadim Vaks
> Assignee: Matt Burgess
>
> Implement a PutDruid Processor and Controller using Tranquility API. This
> will enable Nifi to index contents of flow files in Druid. The implementation
> should also be able to handle late arriving data (event timestamp points to
> Druid indexing task that has closed, segment granularity and grace window
> period expired). Late arriving data is typically dropped. Nifi should allow
> late arriving data to be diverted to FAILED or DROPPED relationship. That
> would allow late arriving data to be stored on HDFS or S3 until a re-indexing
> task can merge it into the correct segment in deep storage.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)