http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoHashFunctionsTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoHashFunctionsTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoHashFunctionsTest.java new file mode 100644 index 0000000..f1a0ec4 --- /dev/null +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoHashFunctionsTest.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.enrichment.stellar; + +import ch.hsr.geohash.WGS84Point; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.metron.stellar.common.utils.StellarProcessorUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.*; + +public class GeoHashFunctionsTest { + public static WGS84Point empireStatePoint = new WGS84Point(40.748570, -73.985752); + public static WGS84Point mosconeCenterPoint = new WGS84Point(37.782891, -122.404166); + public static WGS84Point jutlandPoint = new WGS84Point(57.64911, 10.40740); + public static String explicitJutlandHash = "u4pruydqmvpb"; + String empireStateHash = (String)StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long)" + , ImmutableMap.of("lat", empireStatePoint.getLatitude() + ,"long",empireStatePoint.getLongitude() + ) + ); + String mosconeCenterHash = (String)StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long)" + , ImmutableMap.of("lat", mosconeCenterPoint.getLatitude() + ,"long",mosconeCenterPoint.getLongitude() + ) + ); + String jutlandHash = (String)StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long)" + , ImmutableMap.of("lat", jutlandPoint.getLatitude() + ,"long",jutlandPoint.getLongitude() + ) + ); + + @Test + public void testToLatLong_happypath() throws Exception { + Map<String, Object> latLong = (Map<String, Object>)StellarProcessorUtils.run("GEOHASH_TO_LATLONG(hash)" + , ImmutableMap.of("hash", explicitJutlandHash ) ); + Assert.assertEquals(jutlandPoint.getLatitude(), (double)latLong.get("latitude"), 1e-3); + Assert.assertEquals(jutlandPoint.getLongitude(), (double)latLong.get("longitude"), 1e-3); + } + + @Test + public void testToLatLong_degenerate() throws Exception { + { + Map<String, Object> latLong = (Map<String, Object>) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(hash)" + , ImmutableMap.of("hash", "u")); + Assert.assertFalse(Double.isNaN((double) latLong.get("latitude"))); + Assert.assertFalse(Double.isNaN((double) latLong.get("longitude"))); + } + { + Map<String, Object> latLong = (Map<String, Object>) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(hash)" + , ImmutableMap.of("hash", "")); + Assert.assertEquals(0d, (double)latLong.get("latitude"), 1e-3); + Assert.assertEquals(0d, (double)latLong.get("longitude"), 1e-3); + } + { + Map<String, Object> latLong = (Map<String, Object>) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(null)" + , new HashMap<>()); + Assert.assertNull(latLong); + } + } + + @Test + public void testHash_fromlatlong() throws Exception { + Assert.assertEquals("u4pruydqmv", StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long, 10)" + , ImmutableMap.of("lat", jutlandPoint.getLatitude() + ,"long",jutlandPoint.getLongitude() + ) + ) + ); + + Assert.assertEquals("u4pruydqmvpb", StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long)" + , ImmutableMap.of("lat", jutlandPoint.getLatitude() + ,"long",jutlandPoint.getLongitude() + ) + ) + ); + Assert.assertEquals("u4pruydqmv".substring(0, 6), StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long, 6)" + , ImmutableMap.of("lat", jutlandPoint.getLatitude() + ,"long",jutlandPoint.getLongitude() + ) + ) + ); + Assert.assertNull(StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat)" + , ImmutableMap.of("lat", jutlandPoint.getLatitude() + ) + ) + ); + Assert.assertNull(StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long, 10)" + , ImmutableMap.of("lat", "blah" + ,"long",jutlandPoint.getLongitude() + ) + ) + ); + } + + @Test + public void testHash_fromLocation() throws Exception { + Map<String, String> loc = ImmutableMap.of( "latitude", "" + jutlandPoint.getLatitude() + , "longitude","" + jutlandPoint.getLongitude() + ); + Assert.assertEquals("u4pruydqmv", StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc, 10)" + , ImmutableMap.of("loc", loc + ) + ) + ); + + Assert.assertEquals("u4pruydqmv".substring(0, 6), StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc, 6)" + , ImmutableMap.of("loc", loc + ) + ) + ); + + Assert.assertEquals("u4pruydqmvpb", StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc)" + , ImmutableMap.of("loc", loc + ) + ) + ); + Assert.assertNull(StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc)" + , ImmutableMap.of("loc", ImmutableMap.of( "latitude", "57.64911" )) + ) + ); + Assert.assertNull(StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc, 10)" + , ImmutableMap.of("loc", ImmutableMap.of( "latitude", "blah" + , "longitude","10.40740" + ) + ) + + ) + ); + } + + @Test + public void testDistanceHaversine() throws Exception { + testDistance(Optional.empty()); + testDistance(Optional.of("HAVERSINE")); + } + + @Test + public void testDistanceLawOfCosines() throws Exception { + testDistance(Optional.of("LAW_OF_COSINES")); + } + + @Test + public void testDistanceLawOfVicenty() throws Exception { + testDistance(Optional.of("VICENTY")); + } + + @Test + public void testMaxDistance_happyPath() throws Exception { + Double maxDistance = (double) StellarProcessorUtils.run("GEOHASH_MAX_DIST([empireState, mosconeCenter, jutland])" + , ImmutableMap.of("empireState", empireStateHash + , "mosconeCenter", mosconeCenterHash + , "jutland", jutlandHash + ) + ); + double expectedDistance = 8528; + Assert.assertEquals(expectedDistance, maxDistance, 1d); + } + + @Test + public void testMaxDistance_differentOrder() throws Exception { + Double maxDistance = (double) StellarProcessorUtils.run("GEOHASH_MAX_DIST([jutland, mosconeCenter, empireState])" + , ImmutableMap.of("empireState", empireStateHash + , "mosconeCenter", mosconeCenterHash + , "jutland", jutlandHash + ) + ); + double expectedDistance = 8528; + Assert.assertEquals(expectedDistance, maxDistance, 1d); + } + + @Test + public void testMaxDistance_withNulls() throws Exception { + Double maxDistance = (double) StellarProcessorUtils.run("GEOHASH_MAX_DIST([jutland, mosconeCenter, empireState, null])" + , ImmutableMap.of("empireState", empireStateHash + , "mosconeCenter", mosconeCenterHash + , "jutland", jutlandHash + ) + ); + double expectedDistance = 8528; + Assert.assertEquals(expectedDistance, maxDistance, 1d); + } + @Test + public void testMaxDistance_allSame() throws Exception { + Double maxDistance = (double) StellarProcessorUtils.run("GEOHASH_MAX_DIST([jutland, jutland, jutland])" + , ImmutableMap.of( "jutland", jutlandHash ) + ); + Assert.assertEquals(0, maxDistance, 1e-6d); + } + + @Test + public void testMaxDistance_emptyList() throws Exception { + Double maxDistance = (double) StellarProcessorUtils.run("GEOHASH_MAX_DIST([])" , new HashMap<>() ); + Assert.assertTrue(Double.isNaN(maxDistance)); + } + + @Test + public void testMaxDistance_nullList() throws Exception { + Double maxDistance = (Double) StellarProcessorUtils.run("GEOHASH_MAX_DIST(null)" , new HashMap<>() ); + Assert.assertNull(maxDistance); + } + + @Test + public void testMaxDistance_invalidList() throws Exception { + Double maxDistance = (Double) StellarProcessorUtils.run("GEOHASH_MAX_DIST()" , new HashMap<>() ); + Assert.assertNull(maxDistance); + } + + public void testDistance(Optional<String> method) throws Exception { + double expectedDistance = 4128; //in kilometers + Map<String, Object> vars = ImmutableMap.of("empireState", empireStateHash, "mosconeCenter", mosconeCenterHash); + //ensure that d(x, y) == d(y, x) and that both are the same as the expected (up to 1 km accuracy) + { + String stellarStatement = getDistStellarStatement(ImmutableList.of("mosconeCenter", "empireState"), method); + Assert.assertEquals(expectedDistance, (double) StellarProcessorUtils.run(stellarStatement , vars ), 1D ); + } + { + String stellarStatement = getDistStellarStatement(ImmutableList.of("empireState", "mosconeCenter"), method); + Assert.assertEquals(expectedDistance, (double) StellarProcessorUtils.run(stellarStatement , vars ), 1D ); + } + } + + private static String getDistStellarStatement(List<String> hashVariables, Optional<String> method) { + if(method.isPresent()) { + List<String> vars = new ArrayList<>(); + vars.addAll(hashVariables); + vars.add("\'" + method.get() + "\'"); + return "GEOHASH_DIST(" + Joiner.on(",").skipNulls().join(vars) + ")"; + } + else { + return "GEOHASH_DIST(" + Joiner.on(",").skipNulls().join(hashVariables) + ")"; + } + } + + @Test + public void testCentroid_List() throws Exception { + //happy path + { + double expectedLong = -98.740087 //calculated via http://www.geomidpoint.com/ using the center of gravity or geographic midpoint. + , expectedLat = 41.86921 + ; + Map<String, Double> centroid = (Map) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID([empireState, mosconeCenter]))" + , ImmutableMap.of("empireState", empireStateHash, "mosconeCenter", mosconeCenterHash) + ); + Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3); + Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3); + } + //same point + { + double expectedLong = empireStatePoint.getLongitude() + , expectedLat = empireStatePoint.getLatitude() + ; + Map<String, Double> centroid = (Map) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID([empireState, empireState]))" + , ImmutableMap.of("empireState", empireStateHash) + ); + Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3); + Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3); + } + //one point + { + double expectedLong = empireStatePoint.getLongitude() + , expectedLat = empireStatePoint.getLatitude() + ; + Map<String, Double> centroid = (Map) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID([empireState]))" + , ImmutableMap.of("empireState", empireStateHash) + ); + Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3); + Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3); + } + //no points + { + Map<String, Double> centroid = (Map) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID([]))" + , new HashMap<>() + ); + Assert.assertNull(centroid); + } + } + + @Test + public void testCentroid_weighted() throws Exception { + //happy path + { + double expectedLong = -98.740087 //calculated via http://www.geomidpoint.com/ using the center of gravity or geographic midpoint. + , expectedLat = 41.86921 + ; + for(int weight = 1;weight < 10;++weight) { + Map<Object, Integer> weightedPoints = ImmutableMap.of(empireStateHash, weight, mosconeCenterHash, weight); + Map<String, Double> centroid = (Map) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID(weightedPoints))" + , ImmutableMap.of("weightedPoints", weightedPoints) + ); + Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3); + Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3); + } + } + //same point + { + double expectedLong = empireStatePoint.getLongitude() + , expectedLat = empireStatePoint.getLatitude() + ; + for(int weight = 1;weight < 10;++weight) { + Map<Object, Integer> weightedPoints = ImmutableMap.of(empireStateHash, weight); + Map<String, Double> centroid = (Map) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID(weightedPoints))" + , ImmutableMap.of("weightedPoints", weightedPoints) + ); + Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3); + Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3); + } + } + //no points + { + Map<Object, Integer> weightedPoints = new HashMap<>(); + Map<String, Double> centroid = (Map) StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID(weightedPoints))" + , ImmutableMap.of("weightedPoints", weightedPoints) + ); + Assert.assertNull(centroid); + } + } +}
http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-hbase-client/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-hbase-client/pom.xml b/metron-platform/metron-hbase-client/pom.xml index 5dd6127..1237be7 100644 --- a/metron-platform/metron-hbase-client/pom.xml +++ b/metron-platform/metron-hbase-client/pom.xml @@ -80,6 +80,16 @@ <goal>shade</goal> </goals> <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> <relocations> <relocation> <pattern>org.apache.commons.logging</pattern> http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/pom.xml b/metron-platform/metron-indexing/pom.xml index c64c374..7d07665 100644 --- a/metron-platform/metron-indexing/pom.xml +++ b/metron-platform/metron-indexing/pom.xml @@ -222,6 +222,16 @@ <configuration> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>uber</shadedClassifierName> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> <relocations> <relocation> <pattern>com.google.common</pattern> http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java index ddb88e5..4f47a65 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java @@ -17,7 +17,6 @@ */ package org.apache.metron.indexing.dao; -import org.apache.metron.hbase.HTableProvider; import org.apache.metron.hbase.TableProvider; import java.util.HashMap; @@ -26,6 +25,7 @@ import java.util.function.Supplier; public class AccessConfig { private Integer maxSearchResults; + private Integer maxSearchGroups; private Supplier<Map<String, Object>> globalConfigSupplier; private Map<String, String> optionalSettings = new HashMap<>(); private TableProvider tableProvider = null; @@ -55,6 +55,18 @@ public class AccessConfig { } /** + * The maximum search groups. + * @return + */ + public Integer getMaxSearchGroups() { + return maxSearchGroups; + } + + public void setMaxSearchGroups(Integer maxSearchGroups) { + this.maxSearchGroups = maxSearchGroups; + } + + /** * Get optional settings for initializing indices. * @return */ http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java index a1cf398..c890544 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; @@ -65,6 +67,11 @@ public class HBaseDao implements IndexDao { } @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + return null; + } + + @Override public synchronized void init(AccessConfig config) { if(this.tableInterface == null) { this.config = config; http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java index 350e402..745dccd 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.flipkart.zjsonpatch.JsonPatch; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; @@ -33,7 +35,6 @@ import org.apache.metron.indexing.dao.update.OriginalNotFoundException; import java.io.IOException; import org.apache.metron.indexing.dao.search.FieldType; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Optional; @@ -49,6 +50,8 @@ public interface IndexDao { */ SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException; + GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException; + /** * Initialize the DAO with the AccessConfig object. * @param config http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java index e9a4a9a..61c6231 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java @@ -22,6 +22,8 @@ import com.google.common.base.Joiner; import com.google.common.collect.Iterables; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; @@ -118,6 +120,17 @@ public class MultiIndexDao implements IndexDao { } @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + for(IndexDao dao : indices) { + GroupResponse s = dao.group(groupRequest); + if(s != null) { + return s; + } + } + return null; + } + + @Override public void init(AccessConfig config) { for(IndexDao dao : indices) { dao.init(config); http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java new file mode 100644 index 0000000..be02026 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.metron.indexing.dao.search; + +public class Group { + + private GroupOrder order; + private String field; + + public Group() { + order = new GroupOrder(); + order.setGroupOrderType(GroupOrderType.TERM.toString()); + order.setSortOrder(SortOrder.DESC.toString()); + } + + public GroupOrder getOrder() { + return order; + } + + public void setOrder(GroupOrder order) { + this.order = order; + } + + public String getField() { + return field; + } + + public void setField(String field) { + this.field = field; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java new file mode 100644 index 0000000..b90c438 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.metron.indexing.dao.search; + +public class GroupOrder { + + private SortOrder sortOrder; + private GroupOrderType groupOrderType; + + public SortOrder getSortOrder() { + return sortOrder; + } + + public void setSortOrder(String sortOrder) { + this.sortOrder = SortOrder.fromString(sortOrder); + } + + public GroupOrderType getGroupOrderType() { + return groupOrderType; + } + + public void setGroupOrderType(String groupOrderType) { + this.groupOrderType = GroupOrderType.fromString(groupOrderType); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java new file mode 100644 index 0000000..8444e50 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.metron.indexing.dao.search; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public enum GroupOrderType { + + @JsonProperty("count") + COUNT("count"), + @JsonProperty("term") + TERM("term"); + + private String groupOrderType; + + GroupOrderType(String groupOrderType) { + this.groupOrderType = groupOrderType; + } + + public String getGroupOrderType() { + return groupOrderType; + } + + public static GroupOrderType fromString(String groupOrderType) { + return GroupOrderType.valueOf(groupOrderType.toUpperCase()); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java new file mode 100644 index 0000000..121da10 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.metron.indexing.dao.search; + +import java.util.List; +import java.util.Optional; + +public class GroupRequest { + + private List<String> indices; + private String query; + private String scoreField; + private List<Group> groups; + + public List<String> getIndices() { + return indices; + } + + public void setIndices(List<String> indices) { + this.indices = indices; + } + + public String getQuery() { + return query; + } + + public void setQuery(String query) { + this.query = query; + } + + public Optional<String> getScoreField() { + return scoreField == null ? Optional.empty() : Optional.of(scoreField); + } + + public void setScoreField(String scoreField) { + this.scoreField = scoreField; + } + + public List<Group> getGroups() { + return groups; + } + + public void setGroups(List<Group> groups) { + this.groups = groups; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java new file mode 100644 index 0000000..1b42609 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.metron.indexing.dao.search; + +import java.util.List; + +public class GroupResponse { + + private String groupedBy; + private List<GroupResult> groupResults; + + public String getGroupedBy() { + return groupedBy; + } + + public void setGroupedBy(String groupedBy) { + this.groupedBy = groupedBy; + } + + public List<GroupResult> getGroupResults() { + return groupResults; + } + + public void setGroupResults(List<GroupResult> groupResults) { + this.groupResults = groupResults; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java new file mode 100644 index 0000000..d40f146 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.indexing.dao.search; + +import com.fasterxml.jackson.annotation.JsonInclude; +import java.util.List; + +public class GroupResult { + + private String key; + private long total; + private Double score; + private String groupedBy; + private List<GroupResult> groupResults; + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public long getTotal() { + return total; + } + + public void setTotal(long total) { + this.total = total; + } + + public Double getScore() { + return score; + } + + public void setScore(Double score) { + this.score = score; + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + public String getGroupedBy() { + return groupedBy; + } + + public void setGroupedBy(String groupedBy) { + this.groupedBy = groupedBy; + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + public List<GroupResult> getGroupResults() { + return groupResults; + } + + public void setGroupResults(List<GroupResult> groups) { + this.groupResults = groups; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java index 2d146e0..6e48b58 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java @@ -22,7 +22,6 @@ import com.google.common.base.Splitter; import com.google.common.collect.ComparisonChain; import com.google.common.collect.Iterables; import org.apache.metron.common.Constants; -import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.indexing.dao.search.*; import org.apache.metron.indexing.dao.update.Document; @@ -77,6 +76,27 @@ public class InMemoryDao implements IndexDao { return ret; } + @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + GroupResponse groupResponse = new GroupResponse(); + groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField()); + groupResponse.setGroupResults(getGroupResults(groupRequest.getGroups(), 0)); + return groupResponse; + } + + private List<GroupResult> getGroupResults(List<Group> groups, int index) { + Group group = groups.get(index); + GroupResult groupResult = new GroupResult(); + groupResult.setKey(group.getField() + "_value"); + if (index < groups.size() - 1) { + groupResult.setGroupedBy(groups.get(index + 1).getField()); + groupResult.setGroupResults(getGroupResults(groups, index + 1)); + } else { + groupResult.setScore(50.0); + } + groupResult.setTotal(10); + return Collections.singletonList(groupResult); + } private static class ComparableComparator implements Comparator<Comparable> { SortOrder order = null; http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java index 2645df2..0db8e37 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java @@ -20,10 +20,13 @@ package org.apache.metron.indexing.dao; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.search.SearchResult; +import org.apache.metron.indexing.dao.search.GroupResult; import org.apache.metron.integration.InMemoryComponent; import org.junit.After; import org.junit.Assert; @@ -40,11 +43,11 @@ import java.util.Map; public abstract class SearchIntegrationTest { /** * [ - * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, "long_field": 10000, "timestamp":1, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 1", "duplicate_name_field": "data 1"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "long_field": 20000, "timestamp":2, "latitude": 48.0001, "double_field": 1.00002, "is_alert":false, "location_point": "48.5839,7.7455", "bro_field": "bro data 2", "duplicate_name_field": "data 2"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", "duplicate_name_field": "data 3"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", "duplicate_name_field": "data 4"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", "duplicate_name_field": "data 5"} + * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, "long_field": 10000, "timestamp":1, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 1", "duplicate_name_field": "data 1"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "long_field": 20000, "timestamp":2, "latitude": 48.0001, "score": 50.0, "is_alert":false, "location_point": "48.5839,7.7455", "bro_field": "bro data 2", "duplicate_name_field": "data 2"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "score": 20.0, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", "duplicate_name_field": "data 3"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", "duplicate_name_field": "data 4"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "score": 98.0, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", "duplicate_name_field": "data 5"} * ] */ @Multiline @@ -52,11 +55,11 @@ public abstract class SearchIntegrationTest { /** * [ - * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5} + * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "score": 20.0, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "score": 50.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "score": 10.0, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5} * ] */ @Multiline @@ -149,7 +152,7 @@ public abstract class SearchIntegrationTest { /** * { - * "facetFields": ["source:type", "ip_src_addr", "ip_src_port", "long_field", "timestamp", "latitude", "double_field", "is_alert"], + * "facetFields": ["source:type", "ip_src_addr", "ip_src_port", "long_field", "timestamp", "latitude", "score", "is_alert"], * "indices": ["bro", "snort"], * "query": "*", * "from": 0, @@ -253,6 +256,63 @@ public abstract class SearchIntegrationTest { @Multiline public static String noResultsFieldsQuery; + /** + * { + * "groups": [ + * { + * "field":"is_alert" + * }, + * { + * "field":"latitude" + * } + * ], + * "scoreField":"score", + * "indices": ["bro", "snort"], + * "query": "*" + * } + */ + @Multiline + public static String groupByQuery; + + /** + * { + * "groups": [ + * { + * "field":"is_alert", + * "order": { + * "groupOrderType": "count", + * "sortOrder": "ASC" + * } + * }, + * { + * "field":"ip_src_addr", + * "order": { + * "groupOrderType": "term", + * "sortOrder": "DESC" + * } + * } + * ], + * "indices": ["bro", "snort"], + * "query": "*" + * } + */ + @Multiline + public static String sortedGroupByQuery; + + /** + * { + * "groups": [ + * { + * "field":"location_point" + * } + * ], + * "indices": ["bro", "snort"], + * "query": "*" + * } + */ + @Multiline + public static String badGroupQuery; + protected static IndexDao dao; protected static InMemoryComponent indexComponent; @@ -387,14 +447,18 @@ public abstract class SearchIntegrationTest { Assert.assertEquals(48.5839, Double.parseDouble(latitudeKeys.get(1)), 0.00001); Assert.assertEquals(new Long(2), latitudeCounts.get(latitudeKeys.get(0))); Assert.assertEquals(new Long(8), latitudeCounts.get(latitudeKeys.get(1))); - Map<String, Long> doubleFieldCounts = facetCounts.get("double_field"); - Assert.assertEquals(2, doubleFieldCounts.size()); - List<String> doubleFieldKeys = new ArrayList<>(doubleFieldCounts.keySet()); - Collections.sort(doubleFieldKeys); - Assert.assertEquals(1.00001, Double.parseDouble(doubleFieldKeys.get(0)), 0.00001); - Assert.assertEquals(1.00002, Double.parseDouble(doubleFieldKeys.get(1)), 0.00001); - Assert.assertEquals(new Long(5), doubleFieldCounts.get(doubleFieldKeys.get(0))); - Assert.assertEquals(new Long(5), doubleFieldCounts.get(doubleFieldKeys.get(1))); + Map<String, Long> scoreFieldCounts = facetCounts.get("score"); + Assert.assertEquals(4, scoreFieldCounts.size()); + List<String> scoreFieldKeys = new ArrayList<>(scoreFieldCounts.keySet()); + Collections.sort(scoreFieldKeys); + Assert.assertEquals(10.0, Double.parseDouble(scoreFieldKeys.get(0)), 0.00001); + Assert.assertEquals(20.0, Double.parseDouble(scoreFieldKeys.get(1)), 0.00001); + Assert.assertEquals(50.0, Double.parseDouble(scoreFieldKeys.get(2)), 0.00001); + Assert.assertEquals(98.0, Double.parseDouble(scoreFieldKeys.get(3)), 0.00001); + Assert.assertEquals(new Long(4), scoreFieldCounts.get(scoreFieldKeys.get(0))); + Assert.assertEquals(new Long(2), scoreFieldCounts.get(scoreFieldKeys.get(1))); + Assert.assertEquals(new Long(3), scoreFieldCounts.get(scoreFieldKeys.get(2))); + Assert.assertEquals(new Long(1), scoreFieldCounts.get(scoreFieldKeys.get(3))); Map<String, Long> isAlertCounts = facetCounts.get("is_alert"); Assert.assertEquals(2, isAlertCounts.size()); Assert.assertEquals(new Long(6), isAlertCounts.get("true")); @@ -440,7 +504,7 @@ public abstract class SearchIntegrationTest { Assert.assertEquals(FieldType.LONG, broTypes.get("long_field")); Assert.assertEquals(FieldType.DATE, broTypes.get("timestamp")); Assert.assertEquals(FieldType.FLOAT, broTypes.get("latitude")); - Assert.assertEquals(FieldType.DOUBLE, broTypes.get("double_field")); + Assert.assertEquals(FieldType.DOUBLE, broTypes.get("score")); Assert.assertEquals(FieldType.BOOLEAN, broTypes.get("is_alert")); Assert.assertEquals(FieldType.OTHER, broTypes.get("location_point")); Assert.assertEquals(FieldType.STRING, broTypes.get("bro_field")); @@ -453,7 +517,7 @@ public abstract class SearchIntegrationTest { Assert.assertEquals(FieldType.LONG, snortTypes.get("long_field")); Assert.assertEquals(FieldType.DATE, snortTypes.get("timestamp")); Assert.assertEquals(FieldType.FLOAT, snortTypes.get("latitude")); - Assert.assertEquals(FieldType.DOUBLE, snortTypes.get("double_field")); + Assert.assertEquals(FieldType.DOUBLE, snortTypes.get("score")); Assert.assertEquals(FieldType.BOOLEAN, snortTypes.get("is_alert")); Assert.assertEquals(FieldType.OTHER, snortTypes.get("location_point")); Assert.assertEquals(FieldType.INTEGER, snortTypes.get("snort_field")); @@ -486,7 +550,7 @@ public abstract class SearchIntegrationTest { Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp")); Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); - Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("double_field")); + Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score")); Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); } @@ -527,6 +591,145 @@ public abstract class SearchIntegrationTest { SearchResponse response = dao.search(request); Assert.assertEquals(0, response.getTotal()); } + // Group by test case, default order is count descending + { + GroupRequest request = JSONUtils.INSTANCE.load(groupByQuery, GroupRequest.class); + GroupResponse response = dao.group(request); + Assert.assertEquals("is_alert", response.getGroupedBy()); + List<GroupResult> isAlertGroups = response.getGroupResults(); + Assert.assertEquals(2, isAlertGroups.size()); + + // isAlert == true group + GroupResult trueGroup = isAlertGroups.get(0); + Assert.assertEquals("true", trueGroup.getKey()); + Assert.assertEquals(6, trueGroup.getTotal()); + Assert.assertEquals("latitude", trueGroup.getGroupedBy()); + Assert.assertEquals(198.0, trueGroup.getScore(), 0.00001); + List<GroupResult> trueLatitudeGroups = trueGroup.getGroupResults(); + Assert.assertEquals(2, trueLatitudeGroups.size()); + + + // isAlert == true && latitude == 48.5839 group + GroupResult trueLatitudeGroup2 = trueLatitudeGroups.get(0); + Assert.assertEquals(48.5839, Double.parseDouble(trueLatitudeGroup2.getKey()), 0.00001); + Assert.assertEquals(5, trueLatitudeGroup2.getTotal()); + Assert.assertEquals(148.0, trueLatitudeGroup2.getScore(), 0.00001); + + // isAlert == true && latitude == 48.0001 group + GroupResult trueLatitudeGroup1 = trueLatitudeGroups.get(1); + Assert.assertEquals(48.0001, Double.parseDouble(trueLatitudeGroup1.getKey()), 0.00001); + Assert.assertEquals(1, trueLatitudeGroup1.getTotal()); + Assert.assertEquals(50.0, trueLatitudeGroup1.getScore(), 0.00001); + + // isAlert == false group + GroupResult falseGroup = isAlertGroups.get(1); + Assert.assertEquals("false", falseGroup.getKey()); + Assert.assertEquals("latitude", falseGroup.getGroupedBy()); + Assert.assertEquals(130.0, falseGroup.getScore(), 0.00001); + List<GroupResult> falseLatitudeGroups = falseGroup.getGroupResults(); + Assert.assertEquals(2, falseLatitudeGroups.size()); + + // isAlert == false && latitude == 48.5839 group + GroupResult falseLatitudeGroup2 = falseLatitudeGroups.get(0); + Assert.assertEquals(48.5839, Double.parseDouble(falseLatitudeGroup2.getKey()), 0.00001); + Assert.assertEquals(3, falseLatitudeGroup2.getTotal()); + Assert.assertEquals(80.0, falseLatitudeGroup2.getScore(), 0.00001); + + // isAlert == false && latitude == 48.0001 group + GroupResult falseLatitudeGroup1 = falseLatitudeGroups.get(1); + Assert.assertEquals(48.0001, Double.parseDouble(falseLatitudeGroup1.getKey()), 0.00001); + Assert.assertEquals(1, falseLatitudeGroup1.getTotal()); + Assert.assertEquals(50.0, falseLatitudeGroup1.getScore(), 0.00001); + } + // Group by with sorting test case where is_alert is sorted by count ascending and ip_src_addr is sorted by term descending + { + GroupRequest request = JSONUtils.INSTANCE.load(sortedGroupByQuery, GroupRequest.class); + GroupResponse response = dao.group(request); + Assert.assertEquals("is_alert", response.getGroupedBy()); + List<GroupResult> isAlertGroups = response.getGroupResults(); + Assert.assertEquals(2, isAlertGroups.size()); + + // isAlert == false group + GroupResult falseGroup = isAlertGroups.get(0); + Assert.assertEquals(4, falseGroup.getTotal()); + Assert.assertEquals("ip_src_addr", falseGroup.getGroupedBy()); + List<GroupResult> falseIpSrcAddrGroups = falseGroup.getGroupResults(); + Assert.assertEquals(4, falseIpSrcAddrGroups.size()); + + // isAlert == false && ip_src_addr == 192.168.1.8 group + GroupResult falseIpSrcAddrGroup1 = falseIpSrcAddrGroups.get(0); + Assert.assertEquals("192.168.1.8", falseIpSrcAddrGroup1.getKey()); + Assert.assertEquals(1, falseIpSrcAddrGroup1.getTotal()); + Assert.assertNull(falseIpSrcAddrGroup1.getGroupedBy()); + Assert.assertNull(falseIpSrcAddrGroup1.getGroupResults()); + + // isAlert == false && ip_src_addr == 192.168.1.7 group + GroupResult falseIpSrcAddrGroup2 = falseIpSrcAddrGroups.get(1); + Assert.assertEquals("192.168.1.7", falseIpSrcAddrGroup2.getKey()); + Assert.assertEquals(1, falseIpSrcAddrGroup2.getTotal()); + Assert.assertNull(falseIpSrcAddrGroup2.getGroupedBy()); + Assert.assertNull(falseIpSrcAddrGroup2.getGroupResults()); + + // isAlert == false && ip_src_addr == 192.168.1.6 group + GroupResult falseIpSrcAddrGroup3 = falseIpSrcAddrGroups.get(2); + Assert.assertEquals("192.168.1.6", falseIpSrcAddrGroup3.getKey()); + Assert.assertEquals(1, falseIpSrcAddrGroup3.getTotal()); + Assert.assertNull(falseIpSrcAddrGroup3.getGroupedBy()); + Assert.assertNull(falseIpSrcAddrGroup3.getGroupResults()); + + // isAlert == false && ip_src_addr == 192.168.1.2 group + GroupResult falseIpSrcAddrGroup4 = falseIpSrcAddrGroups.get(3); + Assert.assertEquals("192.168.1.2", falseIpSrcAddrGroup4.getKey()); + Assert.assertEquals(1, falseIpSrcAddrGroup4.getTotal()); + Assert.assertNull(falseIpSrcAddrGroup4.getGroupedBy()); + Assert.assertNull(falseIpSrcAddrGroup4.getGroupResults()); + + // isAlert == false group + GroupResult trueGroup = isAlertGroups.get(1); + Assert.assertEquals(6, trueGroup.getTotal()); + Assert.assertEquals("ip_src_addr", trueGroup.getGroupedBy()); + List<GroupResult> trueIpSrcAddrGroups = trueGroup.getGroupResults(); + Assert.assertEquals(4, trueIpSrcAddrGroups.size()); + + // isAlert == false && ip_src_addr == 192.168.1.5 group + GroupResult trueIpSrcAddrGroup1 = trueIpSrcAddrGroups.get(0); + Assert.assertEquals("192.168.1.5", trueIpSrcAddrGroup1.getKey()); + Assert.assertEquals(1, trueIpSrcAddrGroup1.getTotal()); + Assert.assertNull(trueIpSrcAddrGroup1.getGroupedBy()); + Assert.assertNull(trueIpSrcAddrGroup1.getGroupResults()); + + // isAlert == false && ip_src_addr == 192.168.1.4 group + GroupResult trueIpSrcAddrGroup2 = trueIpSrcAddrGroups.get(1); + Assert.assertEquals("192.168.1.4", trueIpSrcAddrGroup2.getKey()); + Assert.assertEquals(1, trueIpSrcAddrGroup2.getTotal()); + Assert.assertNull(trueIpSrcAddrGroup2.getGroupedBy()); + Assert.assertNull(trueIpSrcAddrGroup2.getGroupResults()); + + // isAlert == false && ip_src_addr == 192.168.1.3 group + GroupResult trueIpSrcAddrGroup3 = trueIpSrcAddrGroups.get(2); + Assert.assertEquals("192.168.1.3", trueIpSrcAddrGroup3.getKey()); + Assert.assertEquals(1, trueIpSrcAddrGroup3.getTotal()); + Assert.assertNull(trueIpSrcAddrGroup3.getGroupedBy()); + Assert.assertNull(trueIpSrcAddrGroup3.getGroupResults()); + + // isAlert == false && ip_src_addr == 192.168.1.1 group + GroupResult trueIpSrcAddrGroup4 = trueIpSrcAddrGroups.get(3); + Assert.assertEquals("192.168.1.1", trueIpSrcAddrGroup4.getKey()); + Assert.assertEquals(3, trueIpSrcAddrGroup4.getTotal()); + Assert.assertNull(trueIpSrcAddrGroup4.getGroupedBy()); + Assert.assertNull(trueIpSrcAddrGroup4.getGroupResults()); + } + //Bad group query + { + GroupRequest request = JSONUtils.INSTANCE.load(badGroupQuery, GroupRequest.class); + try { + dao.group(request); + Assert.fail("Exception expected, but did not come."); + } + catch(InvalidSearchException ise) { + Assert.assertEquals("Could not execute search", ise.getMessage()); + } + } } @AfterClass http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-management/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/pom.xml b/metron-platform/metron-management/pom.xml index 4117d69..a5cae38 100644 --- a/metron-platform/metron-management/pom.xml +++ b/metron-platform/metron-management/pom.xml @@ -205,6 +205,16 @@ <goal>shade</goal> </goals> <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> <relocations> <relocation> <pattern>com.google.common</pattern> http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java index 7574418..bf3342c 100644 --- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java +++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java @@ -82,7 +82,15 @@ public class ConfigurationFunctionsTest { "parserConfig" : { }, "fieldTransformations" : [ ], "readMetadata":false, - "mergeMetadata":false + "mergeMetadata":false, + "parserParallelism" : 1, + "errorWriterParallelism" : 1, + "spoutNumTasks" : 1, + "stormConfig" : {}, + "errorWriterNumTasks":1, + "spoutConfig":{}, + "parserNumTasks":1, + "spoutParallelism":1 } */ @Multiline http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index ea4f1dd..d4a984d 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -103,6 +103,17 @@ then it is assumed to be a regex and will match any topic matching the pattern ( * `mergeMetadata` : Boolean indicating whether to merge metadata with the message or not (`false` by default). See below for a discussion about metadata. * `parserConfig` : A JSON Map representing the parser implementation specific configuration. * `fieldTransformations` : An array of complex objects representing the transformations to be done on the message generated from the parser before writing out to the kafka topic. +* `spoutParallelism` : The kafka spout parallelism (default to `1`). This can be overridden on the command line. +* `spoutNumTasks` : The number of tasks for the spout (default to `1`). This can be overridden on the command line. +* `parserParallelism` : The parser bolt parallelism (default to `1`). This can be overridden on the command line. +* `parserNumTasks` : The number of tasks for the parser bolt (default to `1`). This can be overridden on the command line. +* `errorWriterParallelism` : The error writer bolt parallelism (default to `1`). This can be overridden on the command line. +* `errorWriterNumTasks` : The number of tasks for the error writer bolt (default to `1`). This can be overridden on the command line. +* `numWorkers` : The number of workers to use in the topology (default is the storm default of `1`). +* `numAckers` : The number of acker executors to use in the topology (default is the storm default of `1`). +* `spoutConfig` : A map representing a custom spout config (this is a map). This can be overridden on the command line. +* `securityProtocol` : The security protocol to use for reading from kafka (this is a string). This can be overridden on the command line and also specified in the spout config via the `security.protocol` key. If both are specified, then they are merged and the CLI will take precedence. +* `stormConfig` : The storm config to use (this is a map). This can be overridden on the command line. If both are specified, they are merged with CLI properties taking precedence. The `fieldTransformations` is a complex object which defines a transformation which can be done to a message. This transformation can http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/pom.xml b/metron-platform/metron-parsers/pom.xml index b3b5657..b80343a 100644 --- a/metron-platform/metron-parsers/pom.xml +++ b/metron-platform/metron-parsers/pom.xml @@ -276,6 +276,16 @@ <configuration> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>uber</shadedClassifierName> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> <relocations> <relocation> <pattern>com.fasterxml.jackson</pattern> http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java index 53d3d99..6c0dc28 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java @@ -18,9 +18,11 @@ package org.apache.metron.parsers.topology; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.metron.parsers.topology.config.ValueSupplier; import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder; import org.apache.metron.storm.kafka.flux.SpoutConfiguration; import org.apache.metron.storm.kafka.flux.StormKafkaSpout; +import org.apache.storm.Config; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.topology.TopologyBuilder; @@ -47,34 +49,57 @@ import java.util.*; */ public class ParserTopologyBuilder { + public static class ParserTopology { + private TopologyBuilder builder; + private Config topologyConfig; + + private ParserTopology(TopologyBuilder builder, Config topologyConfig) { + this.builder = builder; + this.topologyConfig = topologyConfig; + } + + + public TopologyBuilder getBuilder() { + return builder; + } + + public Config getTopologyConfig() { + return topologyConfig; + } + } + /** * Builds a Storm topology that parses telemetry data received from an external sensor. * * @param zookeeperUrl Zookeeper URL * @param brokerUrl Kafka Broker URL * @param sensorType Type of sensor - * @param spoutParallelism Parallelism hint for the spout - * @param spoutNumTasks Number of tasks for the spout - * @param parserParallelism Parallelism hint for the parser bolt - * @param parserNumTasks Number of tasks for the parser bolt - * @param errorWriterParallelism Parallelism hint for the bolt that handles errors - * @param errorWriterNumTasks Number of tasks for the bolt that handles errors - * @param kafkaSpoutConfig Configuration options for the kafka spout + * @param spoutParallelismSupplier Supplier for the parallelism hint for the spout + * @param spoutNumTasksSupplier Supplier for the number of tasks for the spout + * @param parserParallelismSupplier Supplier for the parallelism hint for the parser bolt + * @param parserNumTasksSupplier Supplier for the number of tasks for the parser bolt + * @param errorWriterParallelismSupplier Supplier for the parallelism hint for the bolt that handles errors + * @param errorWriterNumTasksSupplier Supplier for the number of tasks for the bolt that handles errors + * @param kafkaSpoutConfigSupplier Supplier for the configuration options for the kafka spout + * @param securityProtocolSupplier Supplier for the security protocol + * @param outputTopic The output kafka topic + * @param stormConfigSupplier Supplier for the storm config * @return A Storm topology that parses telemetry data received from an external sensor * @throws Exception */ - public static TopologyBuilder build(String zookeeperUrl, + public static ParserTopology build(String zookeeperUrl, Optional<String> brokerUrl, String sensorType, - int spoutParallelism, - int spoutNumTasks, - int parserParallelism, - int parserNumTasks, - int errorWriterParallelism, - int errorWriterNumTasks, - Map<String, Object> kafkaSpoutConfig, - Optional<String> securityProtocol, - Optional<String> outputTopic + ValueSupplier<Integer> spoutParallelismSupplier, + ValueSupplier<Integer> spoutNumTasksSupplier, + ValueSupplier<Integer> parserParallelismSupplier, + ValueSupplier<Integer> parserNumTasksSupplier, + ValueSupplier<Integer> errorWriterParallelismSupplier, + ValueSupplier<Integer> errorWriterNumTasksSupplier, + ValueSupplier<Map> kafkaSpoutConfigSupplier, + ValueSupplier<String> securityProtocolSupplier, + Optional<String> outputTopic, + ValueSupplier<Config> stormConfigSupplier ) throws Exception { @@ -82,6 +107,14 @@ public class ParserTopologyBuilder { // fetch configuration from zookeeper ParserConfigurations configs = new ParserConfigurations(); SensorParserConfig parserConfig = getSensorParserConfig(zookeeperUrl, sensorType, configs); + int spoutParallelism = spoutParallelismSupplier.get(parserConfig, Integer.class); + int spoutNumTasks = spoutNumTasksSupplier.get(parserConfig, Integer.class); + int parserParallelism = parserParallelismSupplier.get(parserConfig, Integer.class); + int parserNumTasks = parserNumTasksSupplier.get(parserConfig, Integer.class); + int errorWriterParallelism = errorWriterParallelismSupplier.get(parserConfig, Integer.class); + int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfig, Integer.class); + Map<String, Object> kafkaSpoutConfig = kafkaSpoutConfigSupplier.get(parserConfig, Map.class); + Optional<String> securityProtocol = Optional.ofNullable(securityProtocolSupplier.get(parserConfig, String.class)); // create the spout TopologyBuilder builder = new TopologyBuilder(); @@ -103,7 +136,7 @@ public class ParserTopologyBuilder { .shuffleGrouping("parserBolt", Constants.ERROR_STREAM); } - return builder; + return new ParserTopology(builder, stormConfigSupplier.get(parserConfig, Config.class)); } /** @@ -243,16 +276,16 @@ public class ParserTopologyBuilder { * @throws Exception */ private static SensorParserConfig getSensorParserConfig(String zookeeperUrl, String sensorType, ParserConfigurations configs) throws Exception { - CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl); - client.start(); - ConfigurationsUtils.updateParserConfigsFromZookeeper(configs, client); - SensorParserConfig parserConfig = configs.getSensorParserConfig(sensorType); - if (parserConfig == null) { - throw new IllegalStateException("Cannot find the parser configuration in zookeeper for " + sensorType + "." + - " Please check that it exists in zookeeper by using the 'zk_load_configs.sh -m DUMP' command."); + try(CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl)) { + client.start(); + ConfigurationsUtils.updateParserConfigsFromZookeeper(configs, client); + SensorParserConfig parserConfig = configs.getSensorParserConfig(sensorType); + if (parserConfig == null) { + throw new IllegalStateException("Cannot find the parser configuration in zookeeper for " + sensorType + "." + + " Please check that it exists in zookeeper by using the 'zk_load_configs.sh -m DUMP' command."); + } + return parserConfig; } - client.close(); - return parserConfig; } /** http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java index 8ff4f93..b5ee628 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java @@ -18,10 +18,14 @@ package org.apache.metron.parsers.topology; import org.apache.metron.common.Constants; +import org.apache.metron.parsers.topology.config.ValueSupplier; import org.apache.metron.storm.kafka.flux.SpoutConfiguration; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; import com.fasterxml.jackson.core.type.TypeReference; @@ -235,12 +239,19 @@ public class ParserTopologyCLI { return has(cli)?cli.getOptionValue(shortCode):def; } - public static Config getConfig(CommandLine cli) { - Config config = new Config(); + public static Optional<Config> getConfig(CommandLine cli) { + return getConfig(cli, new Config()); + } + + public static Optional<Config> getConfig(CommandLine cli, Config config) { + if(EXTRA_OPTIONS.has(cli)) { + Map<String, Object> extraOptions = readJSONMapFromFile(new File(EXTRA_OPTIONS.get(cli))); + config.putAll(extraOptions); + } for(ParserOptions option : ParserOptions.values()) { config = option.configHandler.apply(new Arg(config, option.get(cli))); } - return config; + return config.isEmpty()?Optional.empty():Optional.of(config); } public static CommandLine parse(CommandLineParser parser, String[] args) throws ParseException { @@ -273,65 +284,172 @@ public class ParserTopologyCLI { } } + private static CommandLine parse(Options options, String[] args) { + CommandLineParser parser = new PosixParser(); + try { + return ParserOptions.parse(parser, args); + } catch (ParseException pe) { + pe.printStackTrace(); + final HelpFormatter usageFormatter = new HelpFormatter(); + usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true); + System.exit(-1); + return null; + } + } + + public ParserTopologyBuilder.ParserTopology createParserTopology(final CommandLine cmd) throws Exception { + String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd); + Optional<String> brokerUrl = ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty(); + String sensorType= ParserOptions.SENSOR_TYPE.get(cmd); + + /* + It bears mentioning why we're creating this ValueSupplier indirection here. + As a separation of responsibilities, the CLI class defines the order of precedence + for the various topological and structural properties for creating a parser. This is + desirable because there are now (i.e. integration tests) + and may be in the future (i.e. a REST service to start parsers without using the CLI) + other mechanisms to construct parser topologies. It's sensible to split those concerns.. + + Unfortunately, determining the structural parameters for a parser requires interacting with + external services (e.g. zookeeper) that are set up well within the ParserTopology class. + Rather than pulling the infrastructure to interact with those services out and moving it into the + CLI class and breaking that separation of concerns, we've created a supplier + indirection where are providing the logic as to how to create precedence in the CLI class + without owning the responsibility of constructing the infrastructure where the values are + necessarily supplied. + + */ + ValueSupplier<Integer> spoutParallelism = (parserConfig, clazz) -> { + if(ParserOptions.SPOUT_PARALLELISM.has(cmd)) { + return Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1")); + } + return Optional.ofNullable(parserConfig.getSpoutParallelism()).orElse(1); + }; + ValueSupplier<Integer> spoutNumTasks = (parserConfig, clazz) -> { + if(ParserOptions.SPOUT_NUM_TASKS.has(cmd)) { + return Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1")); + } + return Optional.ofNullable(parserConfig.getSpoutNumTasks()).orElse(1); + }; + ValueSupplier<Integer> parserParallelism = (parserConfig, clazz) -> { + if(ParserOptions.PARSER_PARALLELISM.has(cmd)) { + return Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1")); + } + return Optional.ofNullable(parserConfig.getParserParallelism()).orElse(1); + }; + + ValueSupplier<Integer> parserNumTasks = (parserConfig, clazz) -> { + if(ParserOptions.PARSER_NUM_TASKS.has(cmd)) { + return Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1")); + } + return Optional.ofNullable(parserConfig.getParserNumTasks()).orElse(1); + }; + + ValueSupplier<Integer> errorParallelism = (parserConfig, clazz) -> { + if(ParserOptions.ERROR_WRITER_PARALLELISM.has(cmd)) { + return Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1")); + } + return Optional.ofNullable(parserConfig.getErrorWriterParallelism()).orElse(1); + }; + + ValueSupplier<Integer> errorNumTasks = (parserConfig, clazz) -> { + if(ParserOptions.ERROR_WRITER_NUM_TASKS.has(cmd)) { + return Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1")); + } + return Optional.ofNullable(parserConfig.getErrorWriterNumTasks()).orElse(1); + }; + + ValueSupplier<Map> spoutConfig = (parserConfig, clazz) -> { + if(ParserOptions.SPOUT_CONFIG.has(cmd)) { + return readJSONMapFromFile(new File(ParserOptions.SPOUT_CONFIG.get(cmd))); + } + return Optional.ofNullable(parserConfig.getSpoutConfig()).orElse(new HashMap<>()); + }; + + ValueSupplier<String> securityProtocol = (parserConfig, clazz) -> { + Optional<String> sp = Optional.empty(); + if (ParserOptions.SECURITY_PROTOCOL.has(cmd)) { + sp = Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)); + } + if (!sp.isPresent()) { + sp = getSecurityProtocol(sp, spoutConfig.get(parserConfig, Map.class)); + } + return sp.orElse(Optional.ofNullable(parserConfig.getSecurityProtocol()).orElse(null)); + }; + + ValueSupplier<Config> stormConf = (parserConfig, clazz) -> { + Map<String, Object> c = parserConfig.getStormConfig(); + Config finalConfig = new Config(); + if(c != null && !c.isEmpty()) { + finalConfig.putAll(c); + } + if(parserConfig.getNumAckers() != null) { + Config.setNumAckers(finalConfig, parserConfig.getNumAckers()); + } + if(parserConfig.getNumWorkers() != null) { + Config.setNumWorkers(finalConfig, parserConfig.getNumWorkers()); + } + return ParserOptions.getConfig(cmd, finalConfig).orElse(finalConfig); + }; + + Optional<String> outputTopic = ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty(); + + return getParserTopology(zookeeperUrl, brokerUrl, sensorType, spoutParallelism, spoutNumTasks, parserParallelism, parserNumTasks, errorParallelism, errorNumTasks, spoutConfig, securityProtocol, stormConf, outputTopic); + } + + protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl + , Optional<String> brokerUrl + , String sensorType + , ValueSupplier<Integer> spoutParallelism + , ValueSupplier<Integer> spoutNumTasks + , ValueSupplier<Integer> parserParallelism + , ValueSupplier<Integer> parserNumTasks + , ValueSupplier<Integer> errorParallelism + , ValueSupplier<Integer> errorNumTasks + , ValueSupplier<Map> spoutConfig + , ValueSupplier<String> securityProtocol + , ValueSupplier<Config> stormConf + , Optional<String> outputTopic + ) throws Exception + { + return ParserTopologyBuilder.build(zookeeperUrl, + brokerUrl, + sensorType, + spoutParallelism, + spoutNumTasks, + parserParallelism, + parserNumTasks, + errorParallelism, + errorNumTasks, + spoutConfig, + securityProtocol, + outputTopic, + stormConf + ); + } + + public static void main(String[] args) { - Options options = new Options(); try { - CommandLineParser parser = new PosixParser(); - CommandLine cmd = null; - try { - cmd = ParserOptions.parse(parser, args); - } catch (ParseException pe) { - pe.printStackTrace(); - final HelpFormatter usageFormatter = new HelpFormatter(); - usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true); - System.exit(-1); - } + Options options = new Options(); + final CommandLine cmd = parse(options, args); if (cmd.hasOption("h")) { final HelpFormatter usageFormatter = new HelpFormatter(); usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true); System.exit(0); } - String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);; - Optional<String> brokerUrl = ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty(); + ParserTopologyCLI cli = new ParserTopologyCLI(); + ParserTopologyBuilder.ParserTopology topology = cli.createParserTopology(cmd); String sensorType= ParserOptions.SENSOR_TYPE.get(cmd); - int spoutParallelism = Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1")); - int spoutNumTasks = Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1")); - int parserParallelism = Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1")); - int parserNumTasks= Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1")); - int errorParallelism = Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1")); - int errorNumTasks= Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1")); - int invalidParallelism = Integer.parseInt(ParserOptions.INVALID_WRITER_PARALLELISM.get(cmd, "1")); - int invalidNumTasks= Integer.parseInt(ParserOptions.INVALID_WRITER_NUM_TASKS.get(cmd, "1")); - Map<String, Object> spoutConfig = new HashMap<>(); - if(ParserOptions.SPOUT_CONFIG.has(cmd)) { - spoutConfig = readSpoutConfig(new File(ParserOptions.SPOUT_CONFIG.get(cmd))); - } - Optional<String> outputTopic = ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty(); - Optional<String> securityProtocol = ParserOptions.SECURITY_PROTOCOL.has(cmd)?Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)):Optional.empty(); - securityProtocol = getSecurityProtocol(securityProtocol, spoutConfig); - TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl, - brokerUrl, - sensorType, - spoutParallelism, - spoutNumTasks, - parserParallelism, - parserNumTasks, - errorParallelism, - errorNumTasks, - spoutConfig, - securityProtocol, - outputTopic - ); - Config stormConf = ParserOptions.getConfig(cmd); if (ParserOptions.TEST.has(cmd)) { - stormConf.put(Config.TOPOLOGY_DEBUG, true); + topology.getTopologyConfig().put(Config.TOPOLOGY_DEBUG, true); LocalCluster cluster = new LocalCluster(); - cluster.submitTopology(sensorType, stormConf, builder.createTopology()); + cluster.submitTopology(sensorType, topology.getTopologyConfig(), topology.getBuilder().createTopology()); Utils.sleep(300000); cluster.shutdown(); } else { - StormSubmitter.submitTopology(sensorType, stormConf, builder.createTopology()); + StormSubmitter.submitTopology(sensorType, topology.getTopologyConfig(), topology.getBuilder().createTopology()); } } catch (Exception e) { e.printStackTrace(); @@ -347,13 +465,13 @@ public class ParserTopologyCLI { if(!ret.isPresent()) { ret = Optional.ofNullable((String) spoutConfig.get("security.protocol")); } - if(ret.isPresent() && protocol.get().equalsIgnoreCase("PLAINTEXT")) { + if(ret.isPresent() && ret.get().equalsIgnoreCase("PLAINTEXT")) { ret = Optional.empty(); } return ret; } - private static Map<String, Object> readSpoutConfig(File inputFile) { + private static Map<String, Object> readJSONMapFromFile(File inputFile) { String json = null; if (inputFile.exists()) { try {
