This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new a672294 NIFI-6621: Add support for Druid schema-less dimensions
a672294 is described below
commit a672294f3f3edf2f3fbd05a20d0585dc1a431c79
Author: samhjelmfelt <[email protected]>
AuthorDate: Wed Sep 4 16:20:27 2019 -0500
NIFI-6621: Add support for Druid schema-less dimensions
NIFI-6621: Small change to fix failing tests
NIFI-6621: Minor style changes
Signed-off-by: Matthew Burgess <[email protected]>
This closes #3693
---
.../controller/druid/DruidTranquilityController.java | 18 +++++++++++-------
.../druid/DruidTranquilityControllerTest.java | 2 --
.../druid/MockDruidTranquilityController.java | 3 ++-
3 files changed, 13 insertions(+), 10 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
index 78aab4c..4d69699 100644
---
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
+++
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
@@ -231,8 +231,7 @@ public class DruidTranquilityController extends
AbstractControllerService implem
public static final PropertyDescriptor DIMENSIONS_LIST = new
PropertyDescriptor.Builder()
.name("druid-cs-dimensions-list")
.displayName("Dimension Fields")
- .description("A comma separated list of field names that will be
stored as dimensions on ingest.")
- .required(true)
+ .description("A comma separated list of field names that will be
stored as dimensions on ingest. Set to empty string for schema-less
dimensions.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
@@ -400,7 +399,7 @@ public class DruidTranquilityController extends
AbstractControllerService implem
transitUri = String.format(FIREHOSE_PATTERN, dataSource) +
";indexServicePath=" + indexService;
- final List<String> dimensions = getDimensions(dimensionsStringList);
+ final DruidDimensions dimensions = getDimensions(dimensionsStringList);
final List<AggregatorFactory> aggregator =
getAggregatorList(aggregatorJSON);
final Timestamper<Map<String, Object>> timestamper = new
Timestamper<Map<String, Object>>() {
@@ -446,14 +445,14 @@ public class DruidTranquilityController extends
AbstractControllerService implem
}
Beam<Map<String, Object>> buildBeam(String dataSource, String
indexService, String discoveryPath, int clusterPartitions, int
clusterReplication,
- String segmentGranularity, String
queryGranularity, String windowPeriod, String firehoseGracePeriod, String
indexRetryPeriod, List<String> dimensions,
+ String segmentGranularity, String
queryGranularity, String windowPeriod, String firehoseGracePeriod, String
indexRetryPeriod, DruidDimensions dimensions,
List<AggregatorFactory> aggregator,
Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) {
return DruidBeams.builder(timestamper)
.curator(curator)
.discoveryPath(discoveryPath)
.location(DruidLocation.create(DruidEnvironment.create(indexService,
FIREHOSE_PATTERN), dataSource))
.timestampSpec(timestampSpec)
-
.rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator,
QueryGranularity.fromString(queryGranularity)))
+ .rollup(DruidRollup.create(dimensions, aggregator,
QueryGranularity.fromString(queryGranularity)))
.tuning(
ClusteredBeamTuning
.builder()
@@ -518,7 +517,7 @@ public class DruidTranquilityController extends
AbstractControllerService implem
}
}
- private List<String> getDimensions(String dimensionStringList) {
+ private DruidDimensions getDimensions(String dimensionStringList) {
List<String> dimensionList = new ArrayList<>();
if (dimensionStringList != null) {
Arrays.stream(dimensionStringList.split(","))
@@ -526,7 +525,12 @@ public class DruidTranquilityController extends
AbstractControllerService implem
.map(String::trim)
.forEach(dimensionList::add);
}
- return dimensionList;
+ if(dimensionList.isEmpty()) {
+ getLogger().debug("Using schema-less dimensions");
+ return DruidDimensions.schemaless();
+ } else {
+ return DruidDimensions.specific(dimensionList);
+ }
}
private List<AggregatorFactory> getAggregatorList(String aggregatorJSON) {
diff --git
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java
index 56c2616..acccb67 100644
---
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java
+++
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java
@@ -55,8 +55,6 @@ public class DruidTranquilityControllerTest {
runner.setProperty(service,
DruidTranquilityController.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
runner.assertNotValid(service);
runner.setProperty(service,
DruidTranquilityController.AGGREGATOR_JSON, "[{\"type\": \"count\", \"name\":
\"count\"}]");
- runner.assertNotValid(service);
- runner.setProperty(service,
DruidTranquilityController.DIMENSIONS_LIST, "dim1,dim2");
runner.assertValid(service);
}
diff --git
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
index 62b0181..f41f164 100644
---
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
+++
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller.druid;
import com.metamx.tranquility.beam.Beam;
+import com.metamx.tranquility.druid.DruidDimensions;
import com.metamx.tranquility.tranquilizer.MessageDroppedException;
import com.metamx.tranquility.tranquilizer.Tranquilizer;
import com.metamx.tranquility.typeclass.Timestamper;
@@ -137,7 +138,7 @@ public class MockDruidTranquilityController extends
DruidTranquilityController {
@SuppressWarnings("unchecked")
@Override
Beam<Map<String, Object>> buildBeam(String dataSource, String
indexService, String discoveryPath, int clusterPartitions, int
clusterReplication,
- String segmentGranularity, String
queryGranularity, String windowPeriod, String firehoseGracePeriod, String
indexRetryPeriod, List<String> dimensions,
+ String segmentGranularity, String
queryGranularity, String windowPeriod, String firehoseGracePeriod, String
indexRetryPeriod, DruidDimensions dimensions,
List<AggregatorFactory> aggregator,
Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) {
return mock(Beam.class);
}