http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoHashFunctionsTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoHashFunctionsTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoHashFunctionsTest.java
new file mode 100644
index 0000000..f1a0ec4
--- /dev/null
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoHashFunctionsTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.stellar;
+
+import ch.hsr.geohash.WGS84Point;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.metron.stellar.common.utils.StellarProcessorUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class GeoHashFunctionsTest {
+  public static WGS84Point empireStatePoint = new WGS84Point(40.748570, 
-73.985752);
+  public static WGS84Point mosconeCenterPoint = new WGS84Point(37.782891, 
-122.404166);
+  public static WGS84Point jutlandPoint = new WGS84Point(57.64911, 10.40740);
+  public static String explicitJutlandHash = "u4pruydqmvpb";
+  String empireStateHash = 
(String)StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long)"
+                             , ImmutableMap.of("lat", 
empireStatePoint.getLatitude()
+                                              
,"long",empireStatePoint.getLongitude()
+                                              )
+    );
+  String mosconeCenterHash = 
(String)StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long)"
+                             , ImmutableMap.of("lat", 
mosconeCenterPoint.getLatitude()
+                                              
,"long",mosconeCenterPoint.getLongitude()
+                                              )
+    );
+  String jutlandHash = 
(String)StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long)"
+                             , ImmutableMap.of("lat", 
jutlandPoint.getLatitude()
+                                              
,"long",jutlandPoint.getLongitude()
+                                              )
+  );
+
+  @Test
+  public void testToLatLong_happypath() throws Exception {
+    Map<String, Object> latLong = (Map<String, 
Object>)StellarProcessorUtils.run("GEOHASH_TO_LATLONG(hash)"
+            , ImmutableMap.of("hash", explicitJutlandHash ) );
+    Assert.assertEquals(jutlandPoint.getLatitude(), 
(double)latLong.get("latitude"), 1e-3);
+    Assert.assertEquals(jutlandPoint.getLongitude(), 
(double)latLong.get("longitude"), 1e-3);
+  }
+
+  @Test
+  public void testToLatLong_degenerate() throws Exception {
+    {
+      Map<String, Object> latLong = (Map<String, Object>) 
StellarProcessorUtils.run("GEOHASH_TO_LATLONG(hash)"
+              , ImmutableMap.of("hash", "u"));
+      Assert.assertFalse(Double.isNaN((double) latLong.get("latitude")));
+      Assert.assertFalse(Double.isNaN((double) latLong.get("longitude")));
+    }
+    {
+      Map<String, Object> latLong = (Map<String, Object>) 
StellarProcessorUtils.run("GEOHASH_TO_LATLONG(hash)"
+              , ImmutableMap.of("hash", ""));
+      Assert.assertEquals(0d, (double)latLong.get("latitude"), 1e-3);
+      Assert.assertEquals(0d, (double)latLong.get("longitude"), 1e-3);
+    }
+    {
+      Map<String, Object> latLong = (Map<String, Object>) 
StellarProcessorUtils.run("GEOHASH_TO_LATLONG(null)"
+              , new HashMap<>());
+      Assert.assertNull(latLong);
+    }
+  }
+
+  @Test
+  public void testHash_fromlatlong() throws Exception {
+    Assert.assertEquals("u4pruydqmv", 
StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long, 10)"
+                             , ImmutableMap.of("lat", 
jutlandPoint.getLatitude()
+                                              
,"long",jutlandPoint.getLongitude()
+                                              )
+                             )
+    );
+
+    Assert.assertEquals("u4pruydqmvpb", 
StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long)"
+                             , ImmutableMap.of("lat", 
jutlandPoint.getLatitude()
+                                              
,"long",jutlandPoint.getLongitude()
+                                              )
+                             )
+    );
+    Assert.assertEquals("u4pruydqmv".substring(0, 6), 
StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, long, 6)"
+                             , ImmutableMap.of("lat", 
jutlandPoint.getLatitude()
+                                              
,"long",jutlandPoint.getLongitude()
+                                              )
+                             )
+    );
+    Assert.assertNull(StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat)"
+                             , ImmutableMap.of("lat", 
jutlandPoint.getLatitude()
+                                              )
+                             )
+    );
+    Assert.assertNull(StellarProcessorUtils.run("GEOHASH_FROM_LATLONG(lat, 
long, 10)"
+                             , ImmutableMap.of("lat", "blah"
+                                              
,"long",jutlandPoint.getLongitude()
+                                              )
+                             )
+    );
+  }
+
+  @Test
+  public void testHash_fromLocation() throws Exception {
+    Map<String, String> loc = ImmutableMap.of( "latitude", "" + 
jutlandPoint.getLatitude()
+                                             , "longitude","" + 
jutlandPoint.getLongitude()
+                                                                     );
+    Assert.assertEquals("u4pruydqmv", 
StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc, 10)"
+                             , ImmutableMap.of("loc", loc
+                                              )
+                             )
+    );
+
+    Assert.assertEquals("u4pruydqmv".substring(0, 6), 
StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc, 6)"
+                             , ImmutableMap.of("loc", loc
+                                              )
+                             )
+    );
+
+    Assert.assertEquals("u4pruydqmvpb", 
StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc)"
+                             , ImmutableMap.of("loc", loc
+                                              )
+                             )
+    );
+    Assert.assertNull(StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc)"
+                                               , ImmutableMap.of("loc", 
ImmutableMap.of( "latitude", "57.64911" ))
+                             )
+    );
+    Assert.assertNull(StellarProcessorUtils.run("GEOHASH_FROM_LOC(loc, 10)"
+                                                , ImmutableMap.of("loc", 
ImmutableMap.of( "latitude", "blah"
+                                                                               
         , "longitude","10.40740"
+                                                                     )
+                                              )
+
+                             )
+    );
+  }
+
+  @Test
+  public void testDistanceHaversine() throws Exception {
+    testDistance(Optional.empty());
+    testDistance(Optional.of("HAVERSINE"));
+  }
+
+  @Test
+  public void testDistanceLawOfCosines() throws Exception {
+    testDistance(Optional.of("LAW_OF_COSINES"));
+  }
+
+  @Test
+  public void testDistanceLawOfVicenty() throws Exception {
+    testDistance(Optional.of("VICENTY"));
+  }
+
+  @Test
+  public void testMaxDistance_happyPath() throws Exception {
+    Double maxDistance = (double) 
StellarProcessorUtils.run("GEOHASH_MAX_DIST([empireState, mosconeCenter, 
jutland])"
+            , ImmutableMap.of("empireState", empireStateHash
+                    , "mosconeCenter", mosconeCenterHash
+                    , "jutland", jutlandHash
+            )
+    );
+    double expectedDistance = 8528;
+    Assert.assertEquals(expectedDistance, maxDistance, 1d);
+  }
+
+  @Test
+  public void testMaxDistance_differentOrder() throws Exception {
+    Double maxDistance = (double) 
StellarProcessorUtils.run("GEOHASH_MAX_DIST([jutland, mosconeCenter, 
empireState])"
+            , ImmutableMap.of("empireState", empireStateHash
+                    , "mosconeCenter", mosconeCenterHash
+                    , "jutland", jutlandHash
+            )
+    );
+    double expectedDistance = 8528;
+    Assert.assertEquals(expectedDistance, maxDistance, 1d);
+  }
+
+  @Test
+  public void testMaxDistance_withNulls() throws Exception {
+    Double maxDistance = (double) 
StellarProcessorUtils.run("GEOHASH_MAX_DIST([jutland, mosconeCenter, 
empireState, null])"
+            , ImmutableMap.of("empireState", empireStateHash
+                    , "mosconeCenter", mosconeCenterHash
+                    , "jutland", jutlandHash
+            )
+    );
+    double expectedDistance = 8528;
+    Assert.assertEquals(expectedDistance, maxDistance, 1d);
+  }
+  @Test
+  public void testMaxDistance_allSame() throws Exception {
+    Double maxDistance = (double) 
StellarProcessorUtils.run("GEOHASH_MAX_DIST([jutland, jutland, jutland])"
+            , ImmutableMap.of( "jutland", jutlandHash )
+    );
+    Assert.assertEquals(0, maxDistance, 1e-6d);
+  }
+
+  @Test
+  public void testMaxDistance_emptyList() throws Exception {
+    Double maxDistance = (double) 
StellarProcessorUtils.run("GEOHASH_MAX_DIST([])" , new HashMap<>() );
+    Assert.assertTrue(Double.isNaN(maxDistance));
+  }
+
+  @Test
+  public void testMaxDistance_nullList() throws Exception {
+    Double maxDistance = (Double) 
StellarProcessorUtils.run("GEOHASH_MAX_DIST(null)" , new HashMap<>() );
+    Assert.assertNull(maxDistance);
+  }
+
+  @Test
+  public void testMaxDistance_invalidList() throws Exception {
+    Double maxDistance = (Double) 
StellarProcessorUtils.run("GEOHASH_MAX_DIST()" , new HashMap<>() );
+    Assert.assertNull(maxDistance);
+  }
+
+  public void testDistance(Optional<String> method) throws Exception {
+    double expectedDistance = 4128; //in kilometers
+    Map<String, Object> vars = ImmutableMap.of("empireState", empireStateHash, 
"mosconeCenter", mosconeCenterHash);
+    //ensure that d(x, y) == d(y, x) and that both are the same as the 
expected (up to 1 km accuracy)
+    {
+      String stellarStatement = 
getDistStellarStatement(ImmutableList.of("mosconeCenter", "empireState"), 
method);
+      Assert.assertEquals(expectedDistance, (double) 
StellarProcessorUtils.run(stellarStatement , vars ), 1D );
+    }
+    {
+      String stellarStatement = 
getDistStellarStatement(ImmutableList.of("empireState", "mosconeCenter"), 
method);
+      Assert.assertEquals(expectedDistance, (double) 
StellarProcessorUtils.run(stellarStatement , vars ), 1D );
+    }
+  }
+
+  private static String getDistStellarStatement(List<String> hashVariables, 
Optional<String> method) {
+    if(method.isPresent()) {
+      List<String> vars = new ArrayList<>();
+      vars.addAll(hashVariables);
+      vars.add("\'" + method.get() + "\'");
+      return "GEOHASH_DIST(" + Joiner.on(",").skipNulls().join(vars) + ")";
+    }
+    else {
+      return "GEOHASH_DIST(" + Joiner.on(",").skipNulls().join(hashVariables) 
+ ")";
+    }
+  }
+
+  @Test
+  public void testCentroid_List() throws Exception {
+    //happy path
+    {
+      double expectedLong = -98.740087 //calculated via 
http://www.geomidpoint.com/ using the center of gravity or geographic midpoint.
+         , expectedLat = 41.86921
+         ;
+      Map<String, Double> centroid = (Map) 
StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID([empireState, 
mosconeCenter]))"
+              , ImmutableMap.of("empireState", empireStateHash, 
"mosconeCenter", mosconeCenterHash)
+      );
+      Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3);
+      Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3);
+    }
+    //same point
+    {
+      double expectedLong = empireStatePoint.getLongitude()
+         , expectedLat = empireStatePoint.getLatitude()
+         ;
+      Map<String, Double> centroid = (Map) 
StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID([empireState, 
empireState]))"
+              , ImmutableMap.of("empireState", empireStateHash)
+      );
+      Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3);
+      Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3);
+    }
+    //one point
+    {
+      double expectedLong = empireStatePoint.getLongitude()
+         , expectedLat = empireStatePoint.getLatitude()
+         ;
+      Map<String, Double> centroid = (Map) 
StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID([empireState]))"
+              , ImmutableMap.of("empireState", empireStateHash)
+      );
+      Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3);
+      Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3);
+    }
+    //no points
+    {
+      Map<String, Double> centroid = (Map) 
StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID([]))"
+              , new HashMap<>()
+      );
+      Assert.assertNull(centroid);
+    }
+  }
+
+  @Test
+  public void testCentroid_weighted() throws Exception {
+    //happy path
+    {
+      double expectedLong = -98.740087 //calculated via 
http://www.geomidpoint.com/ using the center of gravity or geographic midpoint.
+         , expectedLat = 41.86921
+         ;
+      for(int weight = 1;weight < 10;++weight) {
+        Map<Object, Integer> weightedPoints = ImmutableMap.of(empireStateHash, 
weight, mosconeCenterHash, weight);
+        Map<String, Double> centroid = (Map) 
StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID(weightedPoints))"
+                , ImmutableMap.of("weightedPoints", weightedPoints)
+        );
+        Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3);
+        Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3);
+      }
+    }
+    //same point
+    {
+      double expectedLong = empireStatePoint.getLongitude()
+         , expectedLat = empireStatePoint.getLatitude()
+         ;
+      for(int weight = 1;weight < 10;++weight) {
+        Map<Object, Integer> weightedPoints = ImmutableMap.of(empireStateHash, 
weight);
+        Map<String, Double> centroid = (Map) 
StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID(weightedPoints))"
+                , ImmutableMap.of("weightedPoints", weightedPoints)
+        );
+        Assert.assertEquals(expectedLong, centroid.get("longitude"), 1e-3);
+        Assert.assertEquals(expectedLat, centroid.get("latitude"), 1e-3);
+      }
+    }
+    //no points
+    {
+      Map<Object, Integer> weightedPoints = new HashMap<>();
+      Map<String, Double> centroid = (Map) 
StellarProcessorUtils.run("GEOHASH_TO_LATLONG(GEOHASH_CENTROID(weightedPoints))"
+                , ImmutableMap.of("weightedPoints", weightedPoints)
+        );
+      Assert.assertNull(centroid);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-hbase-client/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase-client/pom.xml 
b/metron-platform/metron-hbase-client/pom.xml
index 5dd6127..1237be7 100644
--- a/metron-platform/metron-hbase-client/pom.xml
+++ b/metron-platform/metron-hbase-client/pom.xml
@@ -80,6 +80,16 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+                          <filters>
+                            <filter>
+                              <artifact>*:*</artifact>
+                              <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                              </excludes>
+                            </filter>
+                          </filters>
                             <relocations>
                                 <relocation>
                                     
<pattern>org.apache.commons.logging</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/pom.xml 
b/metron-platform/metron-indexing/pom.xml
index c64c374..7d07665 100644
--- a/metron-platform/metron-indexing/pom.xml
+++ b/metron-platform/metron-indexing/pom.xml
@@ -222,6 +222,16 @@
                         <configuration>
                             
<shadedArtifactAttached>true</shadedArtifactAttached>
                             <shadedClassifierName>uber</shadedClassifierName>
+                            <filters>
+                              <filter>
+                                <artifact>*:*</artifact>
+                                <excludes>
+                                  <exclude>META-INF/*.SF</exclude>
+                                  <exclude>META-INF/*.DSA</exclude>
+                                  <exclude>META-INF/*.RSA</exclude>
+                                </excludes>
+                              </filter>
+                            </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>com.google.common</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
index ddb88e5..4f47a65 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
@@ -17,7 +17,6 @@
  */
 package org.apache.metron.indexing.dao;
 
-import org.apache.metron.hbase.HTableProvider;
 import org.apache.metron.hbase.TableProvider;
 
 import java.util.HashMap;
@@ -26,6 +25,7 @@ import java.util.function.Supplier;
 
 public class AccessConfig {
   private Integer maxSearchResults;
+  private Integer maxSearchGroups;
   private Supplier<Map<String, Object>> globalConfigSupplier;
   private Map<String, String> optionalSettings = new HashMap<>();
   private TableProvider tableProvider = null;
@@ -55,6 +55,18 @@ public class AccessConfig {
   }
 
   /**
+   * The maximum search groups.
+   * @return
+   */
+  public Integer getMaxSearchGroups() {
+    return maxSearchGroups;
+  }
+
+  public void setMaxSearchGroups(Integer maxSearchGroups) {
+    this.maxSearchGroups = maxSearchGroups;
+  }
+
+  /**
    * Get optional settings for initializing indices.
    * @return
    */

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
index a1cf398..c890544 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
@@ -65,6 +67,11 @@ public class HBaseDao implements IndexDao {
   }
 
   @Override
+  public GroupResponse group(GroupRequest groupRequest) throws 
InvalidSearchException {
+    return null;
+  }
+
+  @Override
   public synchronized void init(AccessConfig config) {
     if(this.tableInterface == null) {
       this.config = config;

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
index 350e402..745dccd 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
@@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.flipkart.zjsonpatch.JsonPatch;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
@@ -33,7 +35,6 @@ import 
org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import java.io.IOException;
 import org.apache.metron.indexing.dao.search.FieldType;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -49,6 +50,8 @@ public interface IndexDao {
    */
   SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException;
 
+  GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException;
+
   /**
    * Initialize the DAO with the AccessConfig object.
    * @param config

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
index e9a4a9a..61c6231 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
@@ -22,6 +22,8 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
@@ -118,6 +120,17 @@ public class MultiIndexDao implements IndexDao {
   }
 
   @Override
+  public GroupResponse group(GroupRequest groupRequest) throws 
InvalidSearchException {
+    for(IndexDao dao : indices) {
+      GroupResponse s = dao.group(groupRequest);
+      if(s != null) {
+        return s;
+      }
+    }
+    return null;
+  }
+
+  @Override
   public void init(AccessConfig config) {
     for(IndexDao dao : indices) {
       dao.init(config);

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java
new file mode 100644
index 0000000..be02026
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/Group.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache 
License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the 
License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+public class Group {
+
+  private GroupOrder order;
+  private String field;
+
+  public Group() {
+    order = new GroupOrder();
+    order.setGroupOrderType(GroupOrderType.TERM.toString());
+    order.setSortOrder(SortOrder.DESC.toString());
+  }
+
+  public GroupOrder getOrder() {
+    return order;
+  }
+
+  public void setOrder(GroupOrder order) {
+    this.order = order;
+  }
+
+  public String getField() {
+    return field;
+  }
+
+  public void setField(String field) {
+    this.field = field;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java
new file mode 100644
index 0000000..b90c438
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrder.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache 
License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the 
License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+public class GroupOrder {
+
+  private SortOrder sortOrder;
+  private GroupOrderType groupOrderType;
+
+  public SortOrder getSortOrder() {
+    return sortOrder;
+  }
+
+  public void setSortOrder(String sortOrder) {
+    this.sortOrder = SortOrder.fromString(sortOrder);
+  }
+
+  public GroupOrderType getGroupOrderType() {
+    return groupOrderType;
+  }
+
+  public void setGroupOrderType(String groupOrderType) {
+    this.groupOrderType = GroupOrderType.fromString(groupOrderType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java
new file mode 100644
index 0000000..8444e50
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupOrderType.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache 
License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the 
License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum GroupOrderType {
+
+  @JsonProperty("count")
+  COUNT("count"),
+  @JsonProperty("term")
+  TERM("term");
+
+  private String groupOrderType;
+
+  GroupOrderType(String groupOrderType) {
+    this.groupOrderType = groupOrderType;
+  }
+
+  public String getGroupOrderType() {
+    return groupOrderType;
+  }
+
+  public static GroupOrderType fromString(String groupOrderType) {
+    return GroupOrderType.valueOf(groupOrderType.toUpperCase());
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java
new file mode 100644
index 0000000..121da10
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupRequest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache 
License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the 
License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+import java.util.List;
+import java.util.Optional;
+
+public class GroupRequest {
+
+  private List<String> indices;
+  private String query;
+  private String scoreField;
+  private List<Group> groups;
+
+  public List<String> getIndices() {
+    return indices;
+  }
+
+  public void setIndices(List<String> indices) {
+    this.indices = indices;
+  }
+
+  public String getQuery() {
+    return query;
+  }
+
+  public void setQuery(String query) {
+    this.query = query;
+  }
+
+  public Optional<String> getScoreField() {
+    return scoreField == null ? Optional.empty() : Optional.of(scoreField);
+  }
+
+  public void setScoreField(String scoreField) {
+    this.scoreField = scoreField;
+  }
+
+  public List<Group> getGroups() {
+    return groups;
+  }
+
+  public void setGroups(List<Group> groups) {
+    this.groups = groups;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java
new file mode 100644
index 0000000..1b42609
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResponse.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache 
License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the 
License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+import java.util.List;
+
+public class GroupResponse {
+
+  private String groupedBy;
+  private List<GroupResult> groupResults;
+
+  public String getGroupedBy() {
+    return groupedBy;
+  }
+
+  public void setGroupedBy(String groupedBy) {
+    this.groupedBy = groupedBy;
+  }
+
+  public List<GroupResult> getGroupResults() {
+    return groupResults;
+  }
+
+  public void setGroupResults(List<GroupResult> groupResults) {
+    this.groupResults = groupResults;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java
new file mode 100644
index 0000000..d40f146
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GroupResult.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.indexing.dao.search;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import java.util.List;
+
+public class GroupResult {
+
+  private String key;
+  private long total;
+  private Double score;
+  private String groupedBy;
+  private List<GroupResult> groupResults;
+
+  public String getKey() {
+    return key;
+  }
+
+  public void setKey(String key) {
+    this.key = key;
+  }
+
+  public long getTotal() {
+    return total;
+  }
+
+  public void setTotal(long total) {
+    this.total = total;
+  }
+
+  public Double getScore() {
+    return score;
+  }
+
+  public void setScore(Double score) {
+    this.score = score;
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public String getGroupedBy() {
+    return groupedBy;
+  }
+
+  public void setGroupedBy(String groupedBy) {
+    this.groupedBy = groupedBy;
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public List<GroupResult> getGroupResults() {
+    return groupResults;
+  }
+
+  public void setGroupResults(List<GroupResult> groups) {
+    this.groupResults = groups;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
index 2d146e0..6e48b58 100644
--- 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
@@ -22,7 +22,6 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Iterables;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.search.*;
 import org.apache.metron.indexing.dao.update.Document;
@@ -77,6 +76,27 @@ public class InMemoryDao implements IndexDao {
     return ret;
   }
 
+  @Override
+  public GroupResponse group(GroupRequest groupRequest) throws 
InvalidSearchException {
+    GroupResponse groupResponse = new GroupResponse();
+    groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField());
+    groupResponse.setGroupResults(getGroupResults(groupRequest.getGroups(), 
0));
+    return groupResponse;
+  }
+
+  private List<GroupResult> getGroupResults(List<Group> groups, int index) {
+    Group group = groups.get(index);
+    GroupResult groupResult = new GroupResult();
+    groupResult.setKey(group.getField() + "_value");
+    if (index < groups.size() - 1) {
+      groupResult.setGroupedBy(groups.get(index + 1).getField());
+      groupResult.setGroupResults(getGroupResults(groups, index + 1));
+    } else {
+      groupResult.setScore(50.0);
+    }
+    groupResult.setTotal(10);
+    return Collections.singletonList(groupResult);
+  }
 
   private static class ComparableComparator implements Comparator<Comparable>  
{
     SortOrder order = null;

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
index 2645df2..0db8e37 100644
--- 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
@@ -20,10 +20,13 @@ package org.apache.metron.indexing.dao;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
+import org.apache.metron.indexing.dao.search.GroupResult;
 import org.apache.metron.integration.InMemoryComponent;
 import org.junit.After;
 import org.junit.Assert;
@@ -40,11 +43,11 @@ import java.util.Map;
 public abstract class SearchIntegrationTest {
   /**
    * [
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, 
"long_field": 10000, "timestamp":1, "latitude": 48.5839, "double_field": 
1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro 
data 1", "duplicate_name_field": "data 1"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, 
"long_field": 20000, "timestamp":2, "latitude": 48.0001, "double_field": 
1.00002, "is_alert":false, "location_point": "48.5839,7.7455", "bro_field": 
"bro data 2", "duplicate_name_field": "data 2"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, 
"long_field": 10000, "timestamp":3, "latitude": 48.5839, "double_field": 
1.00002, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro 
data 3", "duplicate_name_field": "data 3"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, 
"long_field": 10000, "timestamp":4, "latitude": 48.5839, "double_field": 
1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro 
data 4", "duplicate_name_field": "data 4"},
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, 
"long_field": 10000, "timestamp":5, "latitude": 48.5839, "double_field": 
1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro 
data 5", "duplicate_name_field": "data 5"}
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, 
"long_field": 10000, "timestamp":1, "latitude": 48.5839, "score": 10.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 1", 
"duplicate_name_field": "data 1"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, 
"long_field": 20000, "timestamp":2, "latitude": 48.0001, "score": 50.0, 
"is_alert":false, "location_point": "48.5839,7.7455", "bro_field": "bro data 
2", "duplicate_name_field": "data 2"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, 
"long_field": 10000, "timestamp":3, "latitude": 48.5839, "score": 20.0, 
"is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", 
"duplicate_name_field": "data 3"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, 
"long_field": 10000, "timestamp":4, "latitude": 48.5839, "score": 10.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", 
"duplicate_name_field": "data 4"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, 
"long_field": 10000, "timestamp":5, "latitude": 48.5839, "score": 98.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", 
"duplicate_name_field": "data 5"}
    * ]
    */
   @Multiline
@@ -52,11 +55,11 @@ public abstract class SearchIntegrationTest {
 
   /**
    * [
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 
8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "double_field": 
1.00001, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, 
"duplicate_name_field": 1},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 
8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "double_field": 
1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 
20, "duplicate_name_field": 2},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 
8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "double_field": 
1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 
30, "duplicate_name_field": 3},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 
8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "double_field": 
1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 
40, "duplicate_name_field": 4},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 
8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "double_field": 
1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 
50, "duplicate_name_field": 5}
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 
8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, 
"is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, 
"duplicate_name_field": 1},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 
8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, 
"duplicate_name_field": 2},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 
8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "score": 20.0, 
"is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, 
"duplicate_name_field": 3},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 
8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "score": 50.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, 
"duplicate_name_field": 4},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 
8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "score": 10.0, 
"is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, 
"duplicate_name_field": 5}
    * ]
    */
   @Multiline
@@ -149,7 +152,7 @@ public abstract class SearchIntegrationTest {
 
   /**
    * {
-   * "facetFields": ["source:type", "ip_src_addr", "ip_src_port", 
"long_field", "timestamp", "latitude", "double_field", "is_alert"],
+   * "facetFields": ["source:type", "ip_src_addr", "ip_src_port", 
"long_field", "timestamp", "latitude", "score", "is_alert"],
    * "indices": ["bro", "snort"],
    * "query": "*",
    * "from": 0,
@@ -253,6 +256,63 @@ public abstract class SearchIntegrationTest {
   @Multiline
   public static String noResultsFieldsQuery;
 
+  /**
+   * {
+   * "groups": [
+   *   {
+   *     "field":"is_alert"
+   *   },
+   *   {
+   *     "field":"latitude"
+   *   }
+   * ],
+   * "scoreField":"score",
+   * "indices": ["bro", "snort"],
+   * "query": "*"
+   * }
+   */
+  @Multiline
+  public static String groupByQuery;
+
+  /**
+   * {
+   * "groups": [
+   *   {
+   *     "field":"is_alert",
+   *     "order": {
+   *       "groupOrderType": "count",
+   *       "sortOrder": "ASC"
+   *     }
+   *   },
+   *   {
+   *     "field":"ip_src_addr",
+   *     "order": {
+   *       "groupOrderType": "term",
+   *       "sortOrder": "DESC"
+   *     }
+   *   }
+   * ],
+   * "indices": ["bro", "snort"],
+   * "query": "*"
+   * }
+   */
+  @Multiline
+  public static String sortedGroupByQuery;
+
+  /**
+   * {
+   * "groups": [
+   *   {
+   *     "field":"location_point"
+   *   }
+   * ],
+   * "indices": ["bro", "snort"],
+   * "query": "*"
+   * }
+   */
+  @Multiline
+  public static String badGroupQuery;
+
   protected static IndexDao dao;
   protected static InMemoryComponent indexComponent;
 
@@ -387,14 +447,18 @@ public abstract class SearchIntegrationTest {
       Assert.assertEquals(48.5839, Double.parseDouble(latitudeKeys.get(1)), 
0.00001);
       Assert.assertEquals(new Long(2), 
latitudeCounts.get(latitudeKeys.get(0)));
       Assert.assertEquals(new Long(8), 
latitudeCounts.get(latitudeKeys.get(1)));
-      Map<String, Long> doubleFieldCounts = facetCounts.get("double_field");
-      Assert.assertEquals(2, doubleFieldCounts.size());
-      List<String> doubleFieldKeys = new 
ArrayList<>(doubleFieldCounts.keySet());
-      Collections.sort(doubleFieldKeys);
-      Assert.assertEquals(1.00001, Double.parseDouble(doubleFieldKeys.get(0)), 
0.00001);
-      Assert.assertEquals(1.00002, Double.parseDouble(doubleFieldKeys.get(1)), 
0.00001);
-      Assert.assertEquals(new Long(5), 
doubleFieldCounts.get(doubleFieldKeys.get(0)));
-      Assert.assertEquals(new Long(5), 
doubleFieldCounts.get(doubleFieldKeys.get(1)));
+      Map<String, Long> scoreFieldCounts = facetCounts.get("score");
+      Assert.assertEquals(4, scoreFieldCounts.size());
+      List<String> scoreFieldKeys = new ArrayList<>(scoreFieldCounts.keySet());
+      Collections.sort(scoreFieldKeys);
+      Assert.assertEquals(10.0, Double.parseDouble(scoreFieldKeys.get(0)), 
0.00001);
+      Assert.assertEquals(20.0, Double.parseDouble(scoreFieldKeys.get(1)), 
0.00001);
+      Assert.assertEquals(50.0, Double.parseDouble(scoreFieldKeys.get(2)), 
0.00001);
+      Assert.assertEquals(98.0, Double.parseDouble(scoreFieldKeys.get(3)), 
0.00001);
+      Assert.assertEquals(new Long(4), 
scoreFieldCounts.get(scoreFieldKeys.get(0)));
+      Assert.assertEquals(new Long(2), 
scoreFieldCounts.get(scoreFieldKeys.get(1)));
+      Assert.assertEquals(new Long(3), 
scoreFieldCounts.get(scoreFieldKeys.get(2)));
+      Assert.assertEquals(new Long(1), 
scoreFieldCounts.get(scoreFieldKeys.get(3)));
       Map<String, Long> isAlertCounts = facetCounts.get("is_alert");
       Assert.assertEquals(2, isAlertCounts.size());
       Assert.assertEquals(new Long(6), isAlertCounts.get("true"));
@@ -440,7 +504,7 @@ public abstract class SearchIntegrationTest {
       Assert.assertEquals(FieldType.LONG, broTypes.get("long_field"));
       Assert.assertEquals(FieldType.DATE, broTypes.get("timestamp"));
       Assert.assertEquals(FieldType.FLOAT, broTypes.get("latitude"));
-      Assert.assertEquals(FieldType.DOUBLE, broTypes.get("double_field"));
+      Assert.assertEquals(FieldType.DOUBLE, broTypes.get("score"));
       Assert.assertEquals(FieldType.BOOLEAN, broTypes.get("is_alert"));
       Assert.assertEquals(FieldType.OTHER, broTypes.get("location_point"));
       Assert.assertEquals(FieldType.STRING, broTypes.get("bro_field"));
@@ -453,7 +517,7 @@ public abstract class SearchIntegrationTest {
       Assert.assertEquals(FieldType.LONG, snortTypes.get("long_field"));
       Assert.assertEquals(FieldType.DATE, snortTypes.get("timestamp"));
       Assert.assertEquals(FieldType.FLOAT, snortTypes.get("latitude"));
-      Assert.assertEquals(FieldType.DOUBLE, snortTypes.get("double_field"));
+      Assert.assertEquals(FieldType.DOUBLE, snortTypes.get("score"));
       Assert.assertEquals(FieldType.BOOLEAN, snortTypes.get("is_alert"));
       Assert.assertEquals(FieldType.OTHER, snortTypes.get("location_point"));
       Assert.assertEquals(FieldType.INTEGER, snortTypes.get("snort_field"));
@@ -486,7 +550,7 @@ public abstract class SearchIntegrationTest {
       Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
       Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
       Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
-      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("double_field"));
+      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
       Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
       Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
     }
@@ -527,6 +591,145 @@ public abstract class SearchIntegrationTest {
       SearchResponse response = dao.search(request);
       Assert.assertEquals(0, response.getTotal());
     }
+    // Group by test case, default order is count descending
+    {
+      GroupRequest request = JSONUtils.INSTANCE.load(groupByQuery, 
GroupRequest.class);
+      GroupResponse response = dao.group(request);
+      Assert.assertEquals("is_alert", response.getGroupedBy());
+      List<GroupResult> isAlertGroups = response.getGroupResults();
+      Assert.assertEquals(2, isAlertGroups.size());
+
+      // isAlert == true group
+      GroupResult trueGroup = isAlertGroups.get(0);
+      Assert.assertEquals("true", trueGroup.getKey());
+      Assert.assertEquals(6, trueGroup.getTotal());
+      Assert.assertEquals("latitude", trueGroup.getGroupedBy());
+      Assert.assertEquals(198.0, trueGroup.getScore(), 0.00001);
+      List<GroupResult> trueLatitudeGroups = trueGroup.getGroupResults();
+      Assert.assertEquals(2, trueLatitudeGroups.size());
+
+
+      // isAlert == true && latitude == 48.5839 group
+      GroupResult trueLatitudeGroup2 = trueLatitudeGroups.get(0);
+      Assert.assertEquals(48.5839, 
Double.parseDouble(trueLatitudeGroup2.getKey()), 0.00001);
+      Assert.assertEquals(5, trueLatitudeGroup2.getTotal());
+      Assert.assertEquals(148.0, trueLatitudeGroup2.getScore(), 0.00001);
+
+      // isAlert == true && latitude == 48.0001 group
+      GroupResult trueLatitudeGroup1 = trueLatitudeGroups.get(1);
+      Assert.assertEquals(48.0001, 
Double.parseDouble(trueLatitudeGroup1.getKey()), 0.00001);
+      Assert.assertEquals(1, trueLatitudeGroup1.getTotal());
+      Assert.assertEquals(50.0, trueLatitudeGroup1.getScore(), 0.00001);
+
+      // isAlert == false group
+      GroupResult falseGroup = isAlertGroups.get(1);
+      Assert.assertEquals("false", falseGroup.getKey());
+      Assert.assertEquals("latitude", falseGroup.getGroupedBy());
+      Assert.assertEquals(130.0, falseGroup.getScore(), 0.00001);
+      List<GroupResult> falseLatitudeGroups = falseGroup.getGroupResults();
+      Assert.assertEquals(2, falseLatitudeGroups.size());
+
+      // isAlert == false && latitude == 48.5839 group
+      GroupResult falseLatitudeGroup2 = falseLatitudeGroups.get(0);
+      Assert.assertEquals(48.5839, 
Double.parseDouble(falseLatitudeGroup2.getKey()), 0.00001);
+      Assert.assertEquals(3, falseLatitudeGroup2.getTotal());
+      Assert.assertEquals(80.0, falseLatitudeGroup2.getScore(), 0.00001);
+
+      // isAlert == false && latitude == 48.0001 group
+      GroupResult falseLatitudeGroup1 = falseLatitudeGroups.get(1);
+      Assert.assertEquals(48.0001, 
Double.parseDouble(falseLatitudeGroup1.getKey()), 0.00001);
+      Assert.assertEquals(1, falseLatitudeGroup1.getTotal());
+      Assert.assertEquals(50.0, falseLatitudeGroup1.getScore(), 0.00001);
+    }
+    // Group by with sorting test case where is_alert is sorted by count 
ascending and ip_src_addr is sorted by term descending
+    {
+      GroupRequest request = JSONUtils.INSTANCE.load(sortedGroupByQuery, 
GroupRequest.class);
+      GroupResponse response = dao.group(request);
+      Assert.assertEquals("is_alert", response.getGroupedBy());
+      List<GroupResult> isAlertGroups = response.getGroupResults();
+      Assert.assertEquals(2, isAlertGroups.size());
+
+      // isAlert == false group
+      GroupResult falseGroup = isAlertGroups.get(0);
+      Assert.assertEquals(4, falseGroup.getTotal());
+      Assert.assertEquals("ip_src_addr", falseGroup.getGroupedBy());
+      List<GroupResult> falseIpSrcAddrGroups = falseGroup.getGroupResults();
+      Assert.assertEquals(4, falseIpSrcAddrGroups.size());
+
+      // isAlert == false && ip_src_addr == 192.168.1.8 group
+      GroupResult falseIpSrcAddrGroup1 = falseIpSrcAddrGroups.get(0);
+      Assert.assertEquals("192.168.1.8", falseIpSrcAddrGroup1.getKey());
+      Assert.assertEquals(1, falseIpSrcAddrGroup1.getTotal());
+      Assert.assertNull(falseIpSrcAddrGroup1.getGroupedBy());
+      Assert.assertNull(falseIpSrcAddrGroup1.getGroupResults());
+
+      // isAlert == false && ip_src_addr == 192.168.1.7 group
+      GroupResult falseIpSrcAddrGroup2 = falseIpSrcAddrGroups.get(1);
+      Assert.assertEquals("192.168.1.7", falseIpSrcAddrGroup2.getKey());
+      Assert.assertEquals(1, falseIpSrcAddrGroup2.getTotal());
+      Assert.assertNull(falseIpSrcAddrGroup2.getGroupedBy());
+      Assert.assertNull(falseIpSrcAddrGroup2.getGroupResults());
+
+      // isAlert == false && ip_src_addr == 192.168.1.6 group
+      GroupResult falseIpSrcAddrGroup3 = falseIpSrcAddrGroups.get(2);
+      Assert.assertEquals("192.168.1.6", falseIpSrcAddrGroup3.getKey());
+      Assert.assertEquals(1, falseIpSrcAddrGroup3.getTotal());
+      Assert.assertNull(falseIpSrcAddrGroup3.getGroupedBy());
+      Assert.assertNull(falseIpSrcAddrGroup3.getGroupResults());
+
+      // isAlert == false && ip_src_addr == 192.168.1.2 group
+      GroupResult falseIpSrcAddrGroup4 = falseIpSrcAddrGroups.get(3);
+      Assert.assertEquals("192.168.1.2", falseIpSrcAddrGroup4.getKey());
+      Assert.assertEquals(1, falseIpSrcAddrGroup4.getTotal());
+      Assert.assertNull(falseIpSrcAddrGroup4.getGroupedBy());
+      Assert.assertNull(falseIpSrcAddrGroup4.getGroupResults());
+
+      // isAlert == false group
+      GroupResult trueGroup = isAlertGroups.get(1);
+      Assert.assertEquals(6, trueGroup.getTotal());
+      Assert.assertEquals("ip_src_addr", trueGroup.getGroupedBy());
+      List<GroupResult> trueIpSrcAddrGroups = trueGroup.getGroupResults();
+      Assert.assertEquals(4, trueIpSrcAddrGroups.size());
+
+      // isAlert == false && ip_src_addr == 192.168.1.5 group
+      GroupResult trueIpSrcAddrGroup1 = trueIpSrcAddrGroups.get(0);
+      Assert.assertEquals("192.168.1.5", trueIpSrcAddrGroup1.getKey());
+      Assert.assertEquals(1, trueIpSrcAddrGroup1.getTotal());
+      Assert.assertNull(trueIpSrcAddrGroup1.getGroupedBy());
+      Assert.assertNull(trueIpSrcAddrGroup1.getGroupResults());
+
+      // isAlert == false && ip_src_addr == 192.168.1.4 group
+      GroupResult trueIpSrcAddrGroup2 = trueIpSrcAddrGroups.get(1);
+      Assert.assertEquals("192.168.1.4", trueIpSrcAddrGroup2.getKey());
+      Assert.assertEquals(1, trueIpSrcAddrGroup2.getTotal());
+      Assert.assertNull(trueIpSrcAddrGroup2.getGroupedBy());
+      Assert.assertNull(trueIpSrcAddrGroup2.getGroupResults());
+
+      // isAlert == false && ip_src_addr == 192.168.1.3 group
+      GroupResult trueIpSrcAddrGroup3 = trueIpSrcAddrGroups.get(2);
+      Assert.assertEquals("192.168.1.3", trueIpSrcAddrGroup3.getKey());
+      Assert.assertEquals(1, trueIpSrcAddrGroup3.getTotal());
+      Assert.assertNull(trueIpSrcAddrGroup3.getGroupedBy());
+      Assert.assertNull(trueIpSrcAddrGroup3.getGroupResults());
+
+      // isAlert == false && ip_src_addr == 192.168.1.1 group
+      GroupResult trueIpSrcAddrGroup4 = trueIpSrcAddrGroups.get(3);
+      Assert.assertEquals("192.168.1.1", trueIpSrcAddrGroup4.getKey());
+      Assert.assertEquals(3, trueIpSrcAddrGroup4.getTotal());
+      Assert.assertNull(trueIpSrcAddrGroup4.getGroupedBy());
+      Assert.assertNull(trueIpSrcAddrGroup4.getGroupResults());
+    }
+    //Bad group query
+    {
+      GroupRequest request = JSONUtils.INSTANCE.load(badGroupQuery, 
GroupRequest.class);
+      try {
+        dao.group(request);
+        Assert.fail("Exception expected, but did not come.");
+      }
+      catch(InvalidSearchException ise) {
+        Assert.assertEquals("Could not execute search", ise.getMessage());
+      }
+    }
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-management/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/pom.xml 
b/metron-platform/metron-management/pom.xml
index 4117d69..a5cae38 100644
--- a/metron-platform/metron-management/pom.xml
+++ b/metron-platform/metron-management/pom.xml
@@ -205,6 +205,16 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+                          <filters>
+                            <filter>
+                              <artifact>*:*</artifact>
+                              <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                              </excludes>
+                            </filter>
+                          </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>com.google.common</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
index 7574418..bf3342c 100644
--- 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
+++ 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
@@ -82,7 +82,15 @@ public class ConfigurationFunctionsTest {
       "parserConfig" : { },
       "fieldTransformations" : [ ],
       "readMetadata":false,
-      "mergeMetadata":false
+      "mergeMetadata":false,
+      "parserParallelism" : 1,
+      "errorWriterParallelism" : 1,
+      "spoutNumTasks" : 1,
+      "stormConfig" : {},
+      "errorWriterNumTasks":1,
+      "spoutConfig":{},
+      "parserNumTasks":1,
+      "spoutParallelism":1
     }
    */
   @Multiline

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md 
b/metron-platform/metron-parsers/README.md
index ea4f1dd..d4a984d 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -103,6 +103,17 @@ then it is assumed to be a regex and will match any topic 
matching the pattern (
 * `mergeMetadata` : Boolean indicating whether to merge metadata with the 
message or not (`false` by default).  See below for a discussion about metadata.
 * `parserConfig` : A JSON Map representing the parser implementation specific 
configuration.
 * `fieldTransformations` : An array of complex objects representing the 
transformations to be done on the message generated from the parser before 
writing out to the kafka topic.
+* `spoutParallelism` : The kafka spout parallelism (default to `1`).  This can 
be overridden on the command line.
+* `spoutNumTasks` : The number of tasks for the spout (default to `1`). This 
can be overridden on the command line.
+* `parserParallelism` : The parser bolt parallelism (default to `1`). This can 
be overridden on the command line.
+* `parserNumTasks` : The number of tasks for the parser bolt (default to `1`). 
This can be overridden on the command line.
+* `errorWriterParallelism` : The error writer bolt parallelism (default to 
`1`). This can be overridden on the command line.
+* `errorWriterNumTasks` : The number of tasks for the error writer bolt 
(default to `1`). This can be overridden on the command line.
+* `numWorkers` : The number of workers to use in the topology (default is the 
storm default of `1`).
+* `numAckers` : The number of acker executors to use in the topology (default 
is the storm default of `1`).
+* `spoutConfig` : A map representing a custom spout config (this is a map). 
This can be overridden on the command line.
+* `securityProtocol` : The security protocol to use for reading from kafka 
(this is a string).  This can be overridden on the command line and also 
specified in the spout config via the `security.protocol` key.  If both are 
specified, then they are merged and the CLI will take precedence.
+* `stormConfig` : The storm config to use (this is a map).  This can be 
overridden on the command line.  If both are specified, they are merged with 
CLI properties taking precedence.
 
 The `fieldTransformations` is a complex object which defines a
 transformation which can be done to a message.  This transformation can 

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/pom.xml 
b/metron-platform/metron-parsers/pom.xml
index b3b5657..b80343a 100644
--- a/metron-platform/metron-parsers/pom.xml
+++ b/metron-platform/metron-parsers/pom.xml
@@ -276,6 +276,16 @@
                         <configuration>
                             
<shadedArtifactAttached>true</shadedArtifactAttached>
                             <shadedClassifierName>uber</shadedClassifierName>
+                            <filters>
+                              <filter>
+                                <artifact>*:*</artifact>
+                                <excludes>
+                                  <exclude>META-INF/*.SF</exclude>
+                                  <exclude>META-INF/*.DSA</exclude>
+                                  <exclude>META-INF/*.RSA</exclude>
+                                </excludes>
+                              </filter>
+                            </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>com.fasterxml.jackson</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 53d3d99..6c0dc28 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -18,9 +18,11 @@
 package org.apache.metron.parsers.topology;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
 import org.apache.metron.storm.kafka.flux.StormKafkaSpout;
+import org.apache.storm.Config;
 import org.apache.storm.kafka.spout.KafkaSpout;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.topology.TopologyBuilder;
@@ -47,34 +49,57 @@ import java.util.*;
  */
 public class ParserTopologyBuilder {
 
+  public static class ParserTopology {
+    private TopologyBuilder builder;
+    private Config topologyConfig;
+
+    private ParserTopology(TopologyBuilder builder, Config topologyConfig) {
+      this.builder = builder;
+      this.topologyConfig = topologyConfig;
+    }
+
+
+    public TopologyBuilder getBuilder() {
+      return builder;
+    }
+
+    public Config getTopologyConfig() {
+      return topologyConfig;
+    }
+  }
+
   /**
    * Builds a Storm topology that parses telemetry data received from an 
external sensor.
    *
    * @param zookeeperUrl             Zookeeper URL
    * @param brokerUrl                Kafka Broker URL
    * @param sensorType               Type of sensor
-   * @param spoutParallelism         Parallelism hint for the spout
-   * @param spoutNumTasks            Number of tasks for the spout
-   * @param parserParallelism        Parallelism hint for the parser bolt
-   * @param parserNumTasks           Number of tasks for the parser bolt
-   * @param errorWriterParallelism   Parallelism hint for the bolt that 
handles errors
-   * @param errorWriterNumTasks      Number of tasks for the bolt that handles 
errors
-   * @param kafkaSpoutConfig         Configuration options for the kafka spout
+   * @param spoutParallelismSupplier         Supplier for the parallelism hint 
for the spout
+   * @param spoutNumTasksSupplier            Supplier for the number of tasks 
for the spout
+   * @param parserParallelismSupplier        Supplier for the parallelism hint 
for the parser bolt
+   * @param parserNumTasksSupplier           Supplier for the number of tasks 
for the parser bolt
+   * @param errorWriterParallelismSupplier   Supplier for the parallelism hint 
for the bolt that handles errors
+   * @param errorWriterNumTasksSupplier      Supplier for the number of tasks 
for the bolt that handles errors
+   * @param kafkaSpoutConfigSupplier         Supplier for the configuration 
options for the kafka spout
+   * @param securityProtocolSupplier         Supplier for the security protocol
+   * @param outputTopic                      The output kafka topic
+   * @param stormConfigSupplier              Supplier for the storm config
    * @return A Storm topology that parses telemetry data received from an 
external sensor
    * @throws Exception
    */
-  public static TopologyBuilder build(String zookeeperUrl,
+  public static ParserTopology build(String zookeeperUrl,
                                       Optional<String> brokerUrl,
                                       String sensorType,
-                                      int spoutParallelism,
-                                      int spoutNumTasks,
-                                      int parserParallelism,
-                                      int parserNumTasks,
-                                      int errorWriterParallelism,
-                                      int errorWriterNumTasks,
-                                      Map<String, Object> kafkaSpoutConfig,
-                                      Optional<String> securityProtocol,
-                                      Optional<String> outputTopic
+                                      ValueSupplier<Integer> 
spoutParallelismSupplier,
+                                      ValueSupplier<Integer> 
spoutNumTasksSupplier,
+                                      ValueSupplier<Integer> 
parserParallelismSupplier,
+                                      ValueSupplier<Integer> 
parserNumTasksSupplier,
+                                      ValueSupplier<Integer> 
errorWriterParallelismSupplier,
+                                      ValueSupplier<Integer> 
errorWriterNumTasksSupplier,
+                                      ValueSupplier<Map> 
kafkaSpoutConfigSupplier,
+                                      ValueSupplier<String> 
securityProtocolSupplier,
+                                      Optional<String> outputTopic,
+                                      ValueSupplier<Config> stormConfigSupplier
   ) throws Exception {
 
 
@@ -82,6 +107,14 @@ public class ParserTopologyBuilder {
     // fetch configuration from zookeeper
     ParserConfigurations configs = new ParserConfigurations();
     SensorParserConfig parserConfig = getSensorParserConfig(zookeeperUrl, 
sensorType, configs);
+    int spoutParallelism = spoutParallelismSupplier.get(parserConfig, 
Integer.class);
+    int spoutNumTasks = spoutNumTasksSupplier.get(parserConfig, Integer.class);
+    int parserParallelism = parserParallelismSupplier.get(parserConfig, 
Integer.class);
+    int parserNumTasks = parserNumTasksSupplier.get(parserConfig, 
Integer.class);
+    int errorWriterParallelism = 
errorWriterParallelismSupplier.get(parserConfig, Integer.class);
+    int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfig, 
Integer.class);
+    Map<String, Object> kafkaSpoutConfig = 
kafkaSpoutConfigSupplier.get(parserConfig, Map.class);
+    Optional<String> securityProtocol = 
Optional.ofNullable(securityProtocolSupplier.get(parserConfig, String.class));
 
     // create the spout
     TopologyBuilder builder = new TopologyBuilder();
@@ -103,7 +136,7 @@ public class ParserTopologyBuilder {
               .shuffleGrouping("parserBolt", Constants.ERROR_STREAM);
     }
 
-    return builder;
+    return new ParserTopology(builder, stormConfigSupplier.get(parserConfig, 
Config.class));
   }
 
   /**
@@ -243,16 +276,16 @@ public class ParserTopologyBuilder {
    * @throws Exception
    */
   private static SensorParserConfig getSensorParserConfig(String zookeeperUrl, 
String sensorType, ParserConfigurations configs) throws Exception {
-    CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
-    client.start();
-    ConfigurationsUtils.updateParserConfigsFromZookeeper(configs, client);
-    SensorParserConfig parserConfig = 
configs.getSensorParserConfig(sensorType);
-    if (parserConfig == null) {
-      throw new IllegalStateException("Cannot find the parser configuration in 
zookeeper for " + sensorType + "." +
-              "  Please check that it exists in zookeeper by using the 
'zk_load_configs.sh -m DUMP' command.");
+    try(CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl)) 
{
+      client.start();
+      ConfigurationsUtils.updateParserConfigsFromZookeeper(configs, client);
+      SensorParserConfig parserConfig = 
configs.getSensorParserConfig(sensorType);
+      if (parserConfig == null) {
+        throw new IllegalStateException("Cannot find the parser configuration 
in zookeeper for " + sensorType + "." +
+                "  Please check that it exists in zookeeper by using the 
'zk_load_configs.sh -m DUMP' command.");
+      }
+      return parserConfig;
     }
-    client.close();
-    return parserConfig;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 8ff4f93..b5ee628 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -18,10 +18,14 @@
 package org.apache.metron.parsers.topology;
 
 import org.apache.metron.common.Constants;
+import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -235,12 +239,19 @@ public class ParserTopologyCLI {
       return has(cli)?cli.getOptionValue(shortCode):def;
     }
 
-    public static Config getConfig(CommandLine cli) {
-      Config config = new Config();
+    public static Optional<Config> getConfig(CommandLine cli) {
+      return getConfig(cli, new Config());
+    }
+
+    public static Optional<Config> getConfig(CommandLine cli, Config config) {
+      if(EXTRA_OPTIONS.has(cli)) {
+        Map<String, Object> extraOptions = readJSONMapFromFile(new 
File(EXTRA_OPTIONS.get(cli)));
+        config.putAll(extraOptions);
+      }
       for(ParserOptions option : ParserOptions.values()) {
         config = option.configHandler.apply(new Arg(config, option.get(cli)));
       }
-      return config;
+      return config.isEmpty()?Optional.empty():Optional.of(config);
     }
 
     public static CommandLine parse(CommandLineParser parser, String[] args) 
throws ParseException {
@@ -273,65 +284,172 @@ public class ParserTopologyCLI {
     }
   }
 
+  private static CommandLine parse(Options options, String[] args) {
+    CommandLineParser parser = new PosixParser();
+    try {
+      return ParserOptions.parse(parser, args);
+    } catch (ParseException pe) {
+      pe.printStackTrace();
+      final HelpFormatter usageFormatter = new HelpFormatter();
+      usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true);
+      System.exit(-1);
+      return null;
+    }
+  }
+
+  public ParserTopologyBuilder.ParserTopology createParserTopology(final 
CommandLine cmd) throws Exception {
+    String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);
+    Optional<String> brokerUrl = 
ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty();
+    String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
+
+    /*
+    It bears mentioning why we're creating this ValueSupplier indirection here.
+    As a separation of responsibilities, the CLI class defines the order of 
precedence
+    for the various topological and structural properties for creating a 
parser.  This is
+    desirable because there are now (i.e. integration tests)
+    and may be in the future (i.e. a REST service to start parsers without 
using the CLI)
+    other mechanisms to construct parser topologies.  It's sensible to split 
those concerns..
+
+    Unfortunately, determining the structural parameters for a parser requires 
interacting with
+    external services (e.g. zookeeper) that are set up well within the 
ParserTopology class.
+    Rather than pulling the infrastructure to interact with those services out 
and moving it into the
+    CLI class and breaking that separation of concerns, we've created a 
supplier
+    indirection where are providing the logic as to how to create precedence 
in the CLI class
+    without owning the responsibility of constructing the infrastructure where 
the values are
+    necessarily supplied.
+
+     */
+    ValueSupplier<Integer> spoutParallelism = (parserConfig, clazz) -> {
+      if(ParserOptions.SPOUT_PARALLELISM.has(cmd)) {
+        return Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
+      }
+      return Optional.ofNullable(parserConfig.getSpoutParallelism()).orElse(1);
+    };
+    ValueSupplier<Integer> spoutNumTasks = (parserConfig, clazz) -> {
+      if(ParserOptions.SPOUT_NUM_TASKS.has(cmd)) {
+        return Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
+      }
+      return Optional.ofNullable(parserConfig.getSpoutNumTasks()).orElse(1);
+    };
+    ValueSupplier<Integer> parserParallelism = (parserConfig, clazz) -> {
+      if(ParserOptions.PARSER_PARALLELISM.has(cmd)) {
+        return Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, 
"1"));
+      }
+      return 
Optional.ofNullable(parserConfig.getParserParallelism()).orElse(1);
+    };
+
+    ValueSupplier<Integer> parserNumTasks = (parserConfig, clazz) -> {
+      if(ParserOptions.PARSER_NUM_TASKS.has(cmd)) {
+        return Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1"));
+      }
+      return Optional.ofNullable(parserConfig.getParserNumTasks()).orElse(1);
+    };
+
+    ValueSupplier<Integer> errorParallelism = (parserConfig, clazz) -> {
+      if(ParserOptions.ERROR_WRITER_PARALLELISM.has(cmd)) {
+        return 
Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1"));
+      }
+      return 
Optional.ofNullable(parserConfig.getErrorWriterParallelism()).orElse(1);
+    };
+
+    ValueSupplier<Integer> errorNumTasks = (parserConfig, clazz) -> {
+      if(ParserOptions.ERROR_WRITER_NUM_TASKS.has(cmd)) {
+        return Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, 
"1"));
+      }
+      return 
Optional.ofNullable(parserConfig.getErrorWriterNumTasks()).orElse(1);
+    };
+
+    ValueSupplier<Map> spoutConfig = (parserConfig, clazz) -> {
+      if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
+        return readJSONMapFromFile(new 
File(ParserOptions.SPOUT_CONFIG.get(cmd)));
+      }
+      return Optional.ofNullable(parserConfig.getSpoutConfig()).orElse(new 
HashMap<>());
+    };
+
+    ValueSupplier<String> securityProtocol = (parserConfig, clazz) -> {
+      Optional<String> sp = Optional.empty();
+      if (ParserOptions.SECURITY_PROTOCOL.has(cmd)) {
+        sp = Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd));
+      }
+      if (!sp.isPresent()) {
+        sp = getSecurityProtocol(sp, spoutConfig.get(parserConfig, Map.class));
+      }
+      return 
sp.orElse(Optional.ofNullable(parserConfig.getSecurityProtocol()).orElse(null));
+    };
+
+    ValueSupplier<Config> stormConf = (parserConfig, clazz) -> {
+      Map<String, Object> c = parserConfig.getStormConfig();
+      Config finalConfig = new Config();
+      if(c != null && !c.isEmpty()) {
+        finalConfig.putAll(c);
+      }
+      if(parserConfig.getNumAckers() != null) {
+        Config.setNumAckers(finalConfig, parserConfig.getNumAckers());
+      }
+      if(parserConfig.getNumWorkers() != null) {
+        Config.setNumWorkers(finalConfig, parserConfig.getNumWorkers());
+      }
+      return ParserOptions.getConfig(cmd, finalConfig).orElse(finalConfig);
+    };
+
+    Optional<String> outputTopic = 
ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty();
+
+    return getParserTopology(zookeeperUrl, brokerUrl, sensorType, 
spoutParallelism, spoutNumTasks, parserParallelism, parserNumTasks, 
errorParallelism, errorNumTasks, spoutConfig, securityProtocol, stormConf, 
outputTopic);
+  }
+
+  protected ParserTopologyBuilder.ParserTopology getParserTopology( String 
zookeeperUrl
+                                                                  , 
Optional<String> brokerUrl
+                                                                  , String 
sensorType
+                                                                  , 
ValueSupplier<Integer> spoutParallelism
+                                                                  , 
ValueSupplier<Integer> spoutNumTasks
+                                                                  , 
ValueSupplier<Integer> parserParallelism
+                                                                  , 
ValueSupplier<Integer> parserNumTasks
+                                                                  , 
ValueSupplier<Integer> errorParallelism
+                                                                  , 
ValueSupplier<Integer> errorNumTasks
+                                                                  , 
ValueSupplier<Map> spoutConfig
+                                                                  , 
ValueSupplier<String> securityProtocol
+                                                                  , 
ValueSupplier<Config> stormConf
+                                                                  , 
Optional<String> outputTopic
+                                                                  ) throws 
Exception
+  {
+    return ParserTopologyBuilder.build(zookeeperUrl,
+                brokerUrl,
+                sensorType,
+                spoutParallelism,
+                spoutNumTasks,
+                parserParallelism,
+                parserNumTasks,
+                errorParallelism,
+                errorNumTasks,
+                spoutConfig,
+                securityProtocol,
+                outputTopic,
+                stormConf
+        );
+  }
+
+
   public static void main(String[] args) {
-    Options options = new Options();
 
     try {
-      CommandLineParser parser = new PosixParser();
-      CommandLine cmd = null;
-      try {
-        cmd = ParserOptions.parse(parser, args);
-      } catch (ParseException pe) {
-        pe.printStackTrace();
-        final HelpFormatter usageFormatter = new HelpFormatter();
-        usageFormatter.printHelp("ParserTopologyCLI", null, options, null, 
true);
-        System.exit(-1);
-      }
+      Options options = new Options();
+      final CommandLine cmd = parse(options, args);
       if (cmd.hasOption("h")) {
         final HelpFormatter usageFormatter = new HelpFormatter();
         usageFormatter.printHelp("ParserTopologyCLI", null, options, null, 
true);
         System.exit(0);
       }
-      String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);;
-      Optional<String> brokerUrl = 
ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty();
+      ParserTopologyCLI cli = new ParserTopologyCLI();
+      ParserTopologyBuilder.ParserTopology topology = 
cli.createParserTopology(cmd);
       String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
-      int spoutParallelism = 
Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
-      int spoutNumTasks = 
Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
-      int parserParallelism = 
Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1"));
-      int parserNumTasks= 
Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1"));
-      int errorParallelism = 
Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1"));
-      int errorNumTasks= 
Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1"));
-      int invalidParallelism = 
Integer.parseInt(ParserOptions.INVALID_WRITER_PARALLELISM.get(cmd, "1"));
-      int invalidNumTasks= 
Integer.parseInt(ParserOptions.INVALID_WRITER_NUM_TASKS.get(cmd, "1"));
-      Map<String, Object> spoutConfig = new HashMap<>();
-      if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
-        spoutConfig = readSpoutConfig(new 
File(ParserOptions.SPOUT_CONFIG.get(cmd)));
-      }
-      Optional<String> outputTopic = 
ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty();
-      Optional<String> securityProtocol = 
ParserOptions.SECURITY_PROTOCOL.has(cmd)?Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)):Optional.empty();
-      securityProtocol = getSecurityProtocol(securityProtocol, spoutConfig);
-      TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl,
-              brokerUrl,
-              sensorType,
-              spoutParallelism,
-              spoutNumTasks,
-              parserParallelism,
-              parserNumTasks,
-              errorParallelism,
-              errorNumTasks,
-              spoutConfig,
-              securityProtocol,
-              outputTopic
-      );
-      Config stormConf = ParserOptions.getConfig(cmd);
       if (ParserOptions.TEST.has(cmd)) {
-        stormConf.put(Config.TOPOLOGY_DEBUG, true);
+        topology.getTopologyConfig().put(Config.TOPOLOGY_DEBUG, true);
         LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(sensorType, stormConf, 
builder.createTopology());
+        cluster.submitTopology(sensorType, topology.getTopologyConfig(), 
topology.getBuilder().createTopology());
         Utils.sleep(300000);
         cluster.shutdown();
       } else {
-        StormSubmitter.submitTopology(sensorType, stormConf, 
builder.createTopology());
+        StormSubmitter.submitTopology(sensorType, 
topology.getTopologyConfig(), topology.getBuilder().createTopology());
       }
     } catch (Exception e) {
       e.printStackTrace();
@@ -347,13 +465,13 @@ public class ParserTopologyCLI {
     if(!ret.isPresent()) {
       ret = Optional.ofNullable((String) spoutConfig.get("security.protocol"));
     }
-    if(ret.isPresent() && protocol.get().equalsIgnoreCase("PLAINTEXT")) {
+    if(ret.isPresent() && ret.get().equalsIgnoreCase("PLAINTEXT")) {
       ret = Optional.empty();
     }
     return ret;
   }
 
-  private static Map<String, Object> readSpoutConfig(File inputFile) {
+  private static Map<String, Object> readJSONMapFromFile(File inputFile) {
     String json = null;
     if (inputFile.exists()) {
       try {

Reply via email to