http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserConfigService.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserConfigService.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserConfigService.java
new file mode 100644
index 0000000..efda639
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserConfigService.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service;
+
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.model.ParseMessageRequest;
+import org.json.simple.JSONObject;
+
+import java.util.Map;
+
+public interface SensorParserConfigService {
+
+  SensorParserConfig save(SensorParserConfig sensorParserConfig) throws 
RestException;
+
+  SensorParserConfig findOne(String name) throws RestException;
+
+  Iterable<SensorParserConfig> getAll() throws RestException;
+
+  boolean delete(String name) throws RestException;
+
+  Map<String, String> getAvailableParsers();
+
+  Map<String, String> reloadAvailableParsers();
+
+  JSONObject parseMessage(ParseMessageRequest parseMessageRequest) throws 
RestException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java
new file mode 100644
index 0000000..8c1e228
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service;
+
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.model.TopologyResponse;
+
+import java.util.Map;
+
+public interface StormAdminService {
+
+  TopologyResponse startParserTopology(String name) throws RestException;
+
+  TopologyResponse stopParserTopology(String name, boolean stopNow) throws 
RestException;
+
+  TopologyResponse startEnrichmentTopology() throws RestException;
+
+  TopologyResponse stopEnrichmentTopology(boolean stopNow) throws 
RestException;
+
+  TopologyResponse startIndexingTopology() throws RestException;
+
+  TopologyResponse stopIndexingTopology(boolean stopNow) throws RestException;
+
+  Map<String, String> getStormClientStatus() throws RestException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormStatusService.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormStatusService.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormStatusService.java
new file mode 100644
index 0000000..76216d0
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormStatusService.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service;
+
+import org.apache.metron.rest.model.TopologyResponse;
+import org.apache.metron.rest.model.TopologyStatus;
+import org.apache.metron.rest.model.TopologySummary;
+
+import java.util.List;
+
+public interface StormStatusService {
+
+  TopologySummary getTopologySummary();
+
+  TopologyStatus getTopologyStatus(String name);
+
+  List<TopologyStatus> getAllTopologyStatus();
+
+  TopologyResponse activateTopology(String name);
+
+  TopologyResponse deactivateTopology(String name);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/TransformationService.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/TransformationService.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/TransformationService.java
new file mode 100644
index 0000000..d1400c6
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/TransformationService.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service;
+
+import org.apache.metron.common.field.transformation.FieldTransformations;
+import org.apache.metron.rest.model.StellarFunctionDescription;
+import org.apache.metron.rest.model.TransformationValidation;
+
+import java.util.List;
+import java.util.Map;
+
+public interface TransformationService {
+
+    Map<String, Boolean> validateRules(List<String> rules);
+
+    Map<String, Object> validateTransformation(TransformationValidation 
transformationValidation);
+
+    FieldTransformations[] getTransformations();
+
+    List<StellarFunctionDescription> getStellarFunctions();
+
+    List<StellarFunctionDescription> getSimpleStellarFunctions();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java
new file mode 100644
index 0000000..d7bbb23
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Map;
+
+public class DockerStormCLIWrapper extends StormCLIWrapper {
+
+  private Logger LOG = LoggerFactory.getLogger(DockerStormCLIWrapper.class);
+
+  @Autowired
+  private Environment environment;
+
+  @Override
+  protected ProcessBuilder getProcessBuilder(String... command) {
+    String[] dockerCommand = {"docker-compose", "-f", 
environment.getProperty("docker.compose.path"), "-p", "metron", "exec", 
"storm"};
+    ProcessBuilder pb = new ProcessBuilder(ArrayUtils.addAll(dockerCommand, 
command));
+    Map<String, String> pbEnvironment = pb.environment();
+    pbEnvironment.put("METRON_VERSION", 
environment.getProperty("metron.version"));
+    setDockerEnvironment(pbEnvironment);
+    return pb;
+  }
+
+  protected void setDockerEnvironment(Map<String, String> 
environmentVariables) {
+    ProcessBuilder pb = getDockerEnvironmentProcessBuilder();
+    try {
+      Process process = pb.start();
+      BufferedReader inputStream = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
+      String line;
+      while((line = inputStream.readLine()) != null) {
+        if (line.startsWith("export")) {
+          String[] parts = line.replaceFirst("export ", "").split("=");
+          environmentVariables.put(parts[0], parts[1].replaceAll("\"", ""));
+        }
+      }
+      process.waitFor();
+    } catch (IOException | InterruptedException e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  protected ProcessBuilder getDockerEnvironmentProcessBuilder() {
+    String[] command = {"docker-machine", "env", "metron-machine"};
+    return new ProcessBuilder(command);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
new file mode 100644
index 0000000..54c331a
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.zookeeper.KeeperException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.io.ByteArrayInputStream;
+import java.util.Map;
+
+@Service
+public class GlobalConfigServiceImpl implements GlobalConfigService {
+
+    @Autowired
+    private CuratorFramework client;
+
+    @Override
+    public Map<String, Object> save(Map<String, Object> globalConfig) throws 
RestException {
+      try {
+        ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig, client);
+      } catch (Exception e) {
+        throw new RestException(e);
+      }
+      return globalConfig;
+    }
+
+    @Override
+    public Map<String, Object> get() throws RestException {
+        Map<String, Object> globalConfig;
+        try {
+            byte[] globalConfigBytes = 
ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(client);
+            globalConfig = JSONUtils.INSTANCE.load(new 
ByteArrayInputStream(globalConfigBytes), new TypeReference<Map<String, 
Object>>(){});
+        } catch (KeeperException.NoNodeException e) {
+            return null;
+        } catch (Exception e) {
+          throw new RestException(e);
+        }
+        return globalConfig;
+    }
+
+    @Override
+    public boolean delete() throws RestException {
+        try {
+            
client.delete().forPath(ConfigurationType.GLOBAL.getZookeeperRoot());
+        } catch (KeeperException.NoNodeException e) {
+            return false;
+        } catch (Exception e) {
+          throw new RestException(e);
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
new file mode 100644
index 0000000..8fbea13
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.Match;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.model.GrokValidation;
+import org.apache.metron.rest.service.GrokService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.Map;
+
+@Service
+public class GrokServiceImpl implements GrokService {
+
+    @Autowired
+    private Grok commonGrok;
+
+    @Override
+    public Map<String, String> getCommonGrokPatterns() {
+        return commonGrok.getPatterns();
+    }
+
+    @Override
+    public GrokValidation validateGrokStatement(GrokValidation grokValidation) 
throws RestException {
+        Map<String, Object> results;
+        try {
+            Grok grok = new Grok();
+            grok.addPatternFromReader(new 
InputStreamReader(getClass().getResourceAsStream("/patterns/common")));
+            grok.addPatternFromReader(new 
StringReader(grokValidation.getStatement()));
+            String patternLabel = grokValidation.getStatement().substring(0, 
grokValidation.getStatement().indexOf(" "));
+            String grokPattern = "%{" + patternLabel + "}";
+            grok.compile(grokPattern);
+            Match gm = grok.match(grokValidation.getSampleData());
+            gm.captures();
+            results = gm.toMap();
+            results.remove(patternLabel);
+        } catch (StringIndexOutOfBoundsException e) {
+            throw new RestException("A pattern label must be included (ex. 
PATTERN_LABEL ${PATTERN:field} ...)", e.getCause());
+        } catch (Exception e) {
+            throw new RestException(e);
+        }
+        grokValidation.setResults(results);
+        return grokValidation;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/HdfsServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/HdfsServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/HdfsServiceImpl.java
new file mode 100644
index 0000000..c14ec0c
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/HdfsServiceImpl.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.metron.rest.service.HdfsService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+@Service
+public class HdfsServiceImpl implements HdfsService {
+
+    @Autowired
+    private Configuration configuration;
+
+    @Override
+    public byte[] read(Path path) throws IOException {
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        IOUtils.copyBytes(FileSystem.get(configuration).open(path), 
byteArrayOutputStream, configuration);
+        return byteArrayOutputStream.toByteArray();
+    }
+
+    @Override
+    public void write(Path path, byte[] contents) throws IOException {
+        FSDataOutputStream fsDataOutputStream = 
FileSystem.get(configuration).create(path, true);
+        fsDataOutputStream.write(contents);
+        fsDataOutputStream.close();
+    }
+
+    @Override
+    public FileStatus[] list(Path path) throws IOException {
+        return FileSystem.get(configuration).listStatus(path);
+    }
+
+    @Override
+    public boolean delete(Path path, boolean recursive) throws IOException {
+        return FileSystem.get(configuration).delete(path, recursive);
+    }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
new file mode 100644
index 0000000..7246c2f
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import kafka.admin.AdminOperationException;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.model.KafkaTopic;
+import org.apache.metron.rest.service.KafkaService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Service
+public class KafkaServiceImpl implements KafkaService {
+
+    @Autowired
+    private ZkUtils zkUtils;
+
+    @Autowired
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    private String offsetTopic = "__consumer_offsets";
+
+    @Override
+    public KafkaTopic createTopic(KafkaTopic topic) throws RestException {
+        if (!listTopics().contains(topic.getName())) {
+          try {
+              AdminUtils.createTopic(zkUtils, topic.getName(), 
topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(), 
RackAwareMode.Disabled$.MODULE$);
+          } catch (AdminOperationException e) {
+              throw new RestException(e);
+          }
+        }
+        return topic;
+    }
+
+    @Override
+    public boolean deleteTopic(String name) {
+        if (listTopics().contains(name)) {
+            AdminUtils.deleteTopic(zkUtils, name);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public KafkaTopic getTopic(String name) {
+        KafkaTopic kafkaTopic = null;
+        if (listTopics().contains(name)) {
+            List<PartitionInfo> partitionInfos = 
kafkaConsumer.partitionsFor(name);
+            if (partitionInfos.size() > 0) {
+                PartitionInfo partitionInfo = partitionInfos.get(0);
+                kafkaTopic = new KafkaTopic();
+                kafkaTopic.setName(name);
+                kafkaTopic.setNumPartitions(partitionInfos.size());
+                
kafkaTopic.setReplicationFactor(partitionInfo.replicas().length);
+            }
+        }
+        return kafkaTopic;
+    }
+
+    @Override
+    public Set<String> listTopics() {
+        Set<String> topics;
+        synchronized (this) {
+            topics = kafkaConsumer.listTopics().keySet();
+            topics.remove(offsetTopic);
+        }
+        return topics;
+    }
+
+    @Override
+    public String getSampleMessage(String topic) {
+        String message = null;
+        if (listTopics().contains(topic)) {
+            synchronized (this) {
+                
kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream().map(partitionInfo
 ->
+                        new TopicPartition(topic, 
partitionInfo.partition())).collect(Collectors.toList()));
+                for (TopicPartition topicPartition : 
kafkaConsumer.assignment()) {
+                    long offset = kafkaConsumer.position(topicPartition) - 1;
+                    if (offset >= 0) {
+                        kafkaConsumer.seek(topicPartition, offset);
+                    }
+                }
+                ConsumerRecords<String, String> records = 
kafkaConsumer.poll(100);
+                Iterator<ConsumerRecord<String, String>> iterator = 
records.iterator();
+                if (iterator.hasNext()) {
+                    ConsumerRecord<String, String> record = iterator.next();
+                    message = record.value();
+                }
+                kafkaConsumer.unsubscribe();
+            }
+        }
+        return message;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
new file mode 100644
index 0000000..8b3dbb7
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.service.SensorEnrichmentConfigService;
+import org.apache.zookeeper.KeeperException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Service
+public class SensorEnrichmentConfigServiceImpl implements 
SensorEnrichmentConfigService {
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Autowired
+    private CuratorFramework client;
+
+    @Override
+    public SensorEnrichmentConfig save(String name, SensorEnrichmentConfig 
sensorEnrichmentConfig) throws RestException {
+      try {
+        ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(name, 
objectMapper.writeValueAsString(sensorEnrichmentConfig).getBytes(), client);
+      } catch (Exception e) {
+        throw new RestException(e);
+      }
+      return sensorEnrichmentConfig;
+    }
+
+    @Override
+    public SensorEnrichmentConfig findOne(String name) throws RestException {
+        SensorEnrichmentConfig sensorEnrichmentConfig;
+        try {
+            sensorEnrichmentConfig = 
ConfigurationsUtils.readSensorEnrichmentConfigFromZookeeper(name, client);
+        } catch (KeeperException.NoNodeException e) {
+          return null;
+        } catch (Exception e) {
+          throw new RestException(e);
+        }
+      return sensorEnrichmentConfig;
+    }
+
+    @Override
+    public Map<String, SensorEnrichmentConfig> getAll() throws RestException {
+        Map<String, SensorEnrichmentConfig> sensorEnrichmentConfigs = new 
HashMap<>();
+        List<String> sensorNames = getAllTypes();
+        for (String name : sensorNames) {
+            sensorEnrichmentConfigs.put(name, findOne(name));
+        }
+        return sensorEnrichmentConfigs;
+    }
+
+    @Override
+    public List<String> getAllTypes() throws RestException {
+        List<String> types;
+        try {
+            types = 
client.getChildren().forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot());
+        } catch (KeeperException.NoNodeException e) {
+            types = new ArrayList<>();
+        } catch (Exception e) {
+          throw new RestException(e);
+        }
+      return types;
+    }
+
+    @Override
+    public boolean delete(String name) throws RestException {
+        try {
+            
client.delete().forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/" + 
name);
+        } catch (KeeperException.NoNodeException e) {
+            return false;
+        } catch (Exception e) {
+          throw new RestException(e);
+        }
+      return true;
+    }
+
+    @Override
+    public List<String> getAvailableEnrichments() {
+        return new ArrayList<String>() {{
+            add("geo");
+            add("host");
+            add("whois");
+        }};
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
new file mode 100644
index 0000000..ab46418
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.service.SensorIndexingConfigService;
+import org.apache.zookeeper.KeeperException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Service
+public class SensorIndexingConfigServiceImpl implements 
SensorIndexingConfigService {
+
+  @Autowired
+  private ObjectMapper objectMapper;
+
+  @Autowired
+  private CuratorFramework client;
+
+  @Override
+  public Map<String, Object> save(String name, Map<String, Object> 
sensorIndexingConfig) throws RestException {
+    try {
+      ConfigurationsUtils.writeSensorIndexingConfigToZookeeper(name, 
objectMapper.writeValueAsString(sensorIndexingConfig).getBytes(), client);
+    } catch (Exception e) {
+      throw new RestException(e);
+    }
+    return sensorIndexingConfig;
+  }
+
+  @Override
+  public Map<String, Object> findOne(String name) throws RestException {
+    Map<String, Object> sensorIndexingConfig;
+    try {
+      byte[] sensorIndexingConfigBytes = 
ConfigurationsUtils.readSensorIndexingConfigBytesFromZookeeper(name, client);
+      sensorIndexingConfig = JSONUtils.INSTANCE.load(new 
ByteArrayInputStream(sensorIndexingConfigBytes), new TypeReference<Map<String, 
Object>>(){});
+    } catch (KeeperException.NoNodeException e) {
+      return null;
+    } catch (Exception e) {
+      throw new RestException(e);
+    }
+    return sensorIndexingConfig;
+  }
+
+  @Override
+  public Map<String, Map<String, Object>> getAll() throws RestException {
+    Map<String, Map<String, Object>> sensorIndexingConfigs = new HashMap<>();
+    List<String> sensorNames = getAllTypes();
+    for (String name : sensorNames) {
+      sensorIndexingConfigs.put(name, findOne(name));
+    }
+    return sensorIndexingConfigs;
+  }
+
+  @Override
+  public List<String> getAllTypes() throws RestException {
+    List<String> types;
+    try {
+        types = 
client.getChildren().forPath(ConfigurationType.INDEXING.getZookeeperRoot());
+    } catch (KeeperException.NoNodeException e) {
+        types = new ArrayList<>();
+    } catch (Exception e) {
+      throw new RestException(e);
+    }
+    return types;
+  }
+
+  @Override
+  public boolean delete(String name) throws RestException {
+    try {
+        client.delete().forPath(ConfigurationType.INDEXING.getZookeeperRoot() 
+ "/" + name);
+    } catch (KeeperException.NoNodeException e) {
+        return false;
+    } catch (Exception e) {
+      throw new RestException(e);
+    }
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
new file mode 100644
index 0000000..cb88708
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.rest.MetronRestConstants;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.model.ParseMessageRequest;
+import org.apache.metron.rest.service.HdfsService;
+import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.zookeeper.KeeperException;
+import org.json.simple.JSONObject;
+import org.reflections.Reflections;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.stereotype.Service;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.metron.rest.MetronRestConstants.GROK_CLASS_NAME;
+import static 
org.apache.metron.rest.MetronRestConstants.GROK_DEFAULT_PATH_SPRING_PROPERTY;
+import static org.apache.metron.rest.MetronRestConstants.GROK_PATH_KEY;
+import static 
org.apache.metron.rest.MetronRestConstants.GROK_PATTERN_LABEL_KEY;
+import static org.apache.metron.rest.MetronRestConstants.GROK_STATEMENT_KEY;
+import static 
org.apache.metron.rest.MetronRestConstants.GROK_TEMP_PATH_SPRING_PROPERTY;
+
+@Service
+public class SensorParserConfigServiceImpl implements 
SensorParserConfigService {
+
+  @Autowired
+  private Environment environment;
+
+  @Autowired
+  private ObjectMapper objectMapper;
+
+  private CuratorFramework client;
+
+  @Autowired
+  public void setClient(CuratorFramework client) {
+    this.client = client;
+  }
+
+  @Autowired
+  private HdfsService hdfsService;
+
+  private Map<String, String> availableParsers;
+
+  @Override
+  public SensorParserConfig save(SensorParserConfig sensorParserConfig) throws 
RestException {
+    String serializedConfig;
+    if (isGrokConfig(sensorParserConfig)) {
+      addGrokPathToConfig(sensorParserConfig);
+      
sensorParserConfig.getParserConfig().putIfAbsent(MetronRestConstants.GROK_PATTERN_LABEL_KEY,
 sensorParserConfig.getSensorTopic().toUpperCase());
+      String statement = (String) 
sensorParserConfig.getParserConfig().remove(MetronRestConstants.GROK_STATEMENT_KEY);
+      serializedConfig = serialize(sensorParserConfig);
+      
sensorParserConfig.getParserConfig().put(MetronRestConstants.GROK_STATEMENT_KEY,
 statement);
+      saveGrokStatement(sensorParserConfig);
+    } else {
+      serializedConfig = serialize(sensorParserConfig);
+    }
+    try {
+      
ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensorParserConfig.getSensorTopic(),
 serializedConfig.getBytes(), client);
+    } catch (Exception e) {
+      throw new RestException(e);
+    }
+    return sensorParserConfig;
+  }
+
+  private String serialize(SensorParserConfig sensorParserConfig) throws 
RestException {
+    String serializedConfig;
+    try {
+      serializedConfig = objectMapper.writeValueAsString(sensorParserConfig);
+    } catch (JsonProcessingException e) {
+      throw new RestException("Could not serialize SensorParserConfig", "Could 
not serialize " + sensorParserConfig.toString(), e.getCause());
+    }
+    return serializedConfig;
+  }
+
+  @Override
+  public SensorParserConfig findOne(String name) throws RestException {
+    SensorParserConfig sensorParserConfig;
+    try {
+      sensorParserConfig = 
ConfigurationsUtils.readSensorParserConfigFromZookeeper(name, client);
+      if (isGrokConfig(sensorParserConfig)) {
+        addGrokStatementToConfig(sensorParserConfig);
+      }
+    } catch (KeeperException.NoNodeException e) {
+      return null;
+    } catch (Exception e) {
+      throw new RestException(e);
+    }
+    return sensorParserConfig;
+  }
+
+  @Override
+  public Iterable<SensorParserConfig> getAll() throws RestException {
+    List<SensorParserConfig> sensorParserConfigs = new ArrayList<>();
+    List<String> sensorNames = getAllTypes();
+    for (String name : sensorNames) {
+      sensorParserConfigs.add(findOne(name));
+    }
+    return sensorParserConfigs;
+  }
+
+  @Override
+  public boolean delete(String name) throws RestException {
+    try {
+      client.delete().forPath(ConfigurationType.PARSER.getZookeeperRoot() + 
"/" + name);
+    } catch (KeeperException.NoNodeException e) {
+      return false;
+    } catch (Exception e) {
+      throw new RestException(e);
+    }
+    return true;
+  }
+
+  private List<String> getAllTypes() throws RestException {
+    List<String> types;
+    try {
+      types = 
client.getChildren().forPath(ConfigurationType.PARSER.getZookeeperRoot());
+    } catch (KeeperException.NoNodeException e) {
+      types = new ArrayList<>();
+    } catch (Exception e) {
+      throw new RestException(e);
+    }
+    return types;
+  }
+
+  @Override
+  public Map<String, String> getAvailableParsers() {
+    if (availableParsers == null) {
+      availableParsers = new HashMap<>();
+      Set<Class<? extends MessageParser>> parserClasses = getParserClasses();
+      parserClasses.forEach(parserClass -> {
+        if (!"BasicParser".equals(parserClass.getSimpleName())) {
+          
availableParsers.put(parserClass.getSimpleName().replaceAll("Basic|Parser", 
""), parserClass.getName());
+        }
+      });
+    }
+    return availableParsers;
+  }
+
+  @Override
+  public Map<String, String> reloadAvailableParsers() {
+    availableParsers = null;
+    return getAvailableParsers();
+  }
+
+  private Set<Class<? extends MessageParser>> getParserClasses() {
+    Reflections reflections = new Reflections("org.apache.metron.parsers");
+    return reflections.getSubTypesOf(MessageParser.class);
+  }
+
+  @Override
+  public JSONObject parseMessage(ParseMessageRequest parseMessageRequest) 
throws RestException {
+    SensorParserConfig sensorParserConfig = 
parseMessageRequest.getSensorParserConfig();
+    if (sensorParserConfig == null) {
+      throw new RestException("SensorParserConfig is missing from 
ParseMessageRequest");
+    } else if (sensorParserConfig.getParserClassName() == null) {
+      throw new RestException("SensorParserConfig must have a 
parserClassName");
+    } else {
+      MessageParser<JSONObject> parser = null;
+      try {
+        parser = (MessageParser<JSONObject>) 
Class.forName(sensorParserConfig.getParserClassName()).newInstance();
+      } catch (Exception e) {
+        throw new RestException(e.toString(), e.getCause());
+      }
+      if (isGrokConfig(sensorParserConfig)) {
+        saveTemporaryGrokStatement(sensorParserConfig);
+        
sensorParserConfig.getParserConfig().put(MetronRestConstants.GROK_PATH_KEY, new 
File(getTemporaryGrokRootPath(), 
sensorParserConfig.getSensorTopic()).toString());
+      }
+      parser.configure(sensorParserConfig.getParserConfig());
+      JSONObject results = 
parser.parse(parseMessageRequest.getSampleData().getBytes()).get(0);
+      if (isGrokConfig(sensorParserConfig)) {
+        deleteTemporaryGrokStatement(sensorParserConfig);
+      }
+      return results;
+    }
+  }
+
+  private boolean isGrokConfig(SensorParserConfig sensorParserConfig) {
+    return GROK_CLASS_NAME.equals(sensorParserConfig.getParserClassName());
+  }
+
+  private void addGrokStatementToConfig(SensorParserConfig sensorParserConfig) 
throws RestException {
+    String grokStatement = "";
+    String grokPath = (String) 
sensorParserConfig.getParserConfig().get(GROK_PATH_KEY);
+    if (grokPath != null) {
+      String fullGrokStatement = getGrokStatement(grokPath);
+      String patternLabel = (String) 
sensorParserConfig.getParserConfig().get(GROK_PATTERN_LABEL_KEY);
+      grokStatement = fullGrokStatement.replaceFirst(patternLabel + " ", "");
+    }
+    sensorParserConfig.getParserConfig().put(GROK_STATEMENT_KEY, 
grokStatement);
+  }
+
+  private void addGrokPathToConfig(SensorParserConfig sensorParserConfig) {
+    if (sensorParserConfig.getParserConfig().get(GROK_PATH_KEY) == null) {
+      String grokStatement = (String) 
sensorParserConfig.getParserConfig().get(GROK_STATEMENT_KEY);
+      if (grokStatement != null) {
+        sensorParserConfig.getParserConfig().put(GROK_PATH_KEY,
+                new 
Path(environment.getProperty(GROK_DEFAULT_PATH_SPRING_PROPERTY), 
sensorParserConfig.getSensorTopic()).toString());
+      }
+    }
+  }
+
+  private String getGrokStatement(String path) throws RestException {
+    try {
+      return new String(hdfsService.read(new Path(path)));
+    } catch (IOException e) {
+      throw new RestException(e);
+    }
+  }
+
+  private void saveGrokStatement(SensorParserConfig sensorParserConfig) throws 
RestException {
+    saveGrokStatement(sensorParserConfig, false);
+  }
+
+  private void saveTemporaryGrokStatement(SensorParserConfig 
sensorParserConfig) throws RestException {
+    saveGrokStatement(sensorParserConfig, true);
+  }
+
+  private void saveGrokStatement(SensorParserConfig sensorParserConfig, 
boolean isTemporary) throws RestException {
+    String patternLabel = (String) 
sensorParserConfig.getParserConfig().get(GROK_PATTERN_LABEL_KEY);
+    String grokPath = (String) 
sensorParserConfig.getParserConfig().get(GROK_PATH_KEY);
+    String grokStatement = (String) 
sensorParserConfig.getParserConfig().get(GROK_STATEMENT_KEY);
+    if (grokStatement != null) {
+      String fullGrokStatement = patternLabel + " " + grokStatement;
+      try {
+        if (!isTemporary) {
+          hdfsService.write(new Path(grokPath), fullGrokStatement.getBytes());
+        } else {
+          File grokDirectory = new File(getTemporaryGrokRootPath());
+          if (!grokDirectory.exists()) {
+            grokDirectory.mkdirs();
+          }
+          FileWriter fileWriter = new FileWriter(new File(grokDirectory, 
sensorParserConfig.getSensorTopic()));
+          fileWriter.write(fullGrokStatement);
+          fileWriter.close();
+        }
+      } catch (IOException e) {
+        throw new RestException(e);
+      }
+    } else {
+      throw new RestException("A grokStatement must be provided");
+    }
+  }
+
+  private void deleteTemporaryGrokStatement(SensorParserConfig 
sensorParserConfig) {
+    File file = new File(getTemporaryGrokRootPath(), 
sensorParserConfig.getSensorTopic());
+    file.delete();
+  }
+
+  private String getTemporaryGrokRootPath() {
+    String grokTempPath = 
environment.getProperty(GROK_TEMP_PATH_SPRING_PROPERTY);
+    Authentication authentication = 
SecurityContextHolder.getContext().getAuthentication();
+    return new Path(grokTempPath, authentication.getName()).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
new file mode 100644
index 0000000..cb7c449
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.model.TopologyResponse;
+import org.apache.metron.rest.model.TopologyStatusCode;
+import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.rest.service.StormAdminService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+@Service
+public class StormAdminServiceImpl implements StormAdminService {
+
+    private StormCLIWrapper stormCLIClientWrapper;
+
+    @Autowired
+    public void setStormCLIClientWrapper(StormCLIWrapper 
stormCLIClientWrapper) {
+        this.stormCLIClientWrapper = stormCLIClientWrapper;
+    }
+
+    @Autowired
+    private GlobalConfigService globalConfigService;
+
+    @Autowired
+    private SensorParserConfigService sensorParserConfigService;
+
+    @Override
+    public TopologyResponse startParserTopology(String name) throws 
RestException {
+        TopologyResponse topologyResponse = new TopologyResponse();
+        if (globalConfigService.get() == null) {
+            
topologyResponse.setErrorMessage(TopologyStatusCode.GLOBAL_CONFIG_MISSING.toString());
+        } else if (sensorParserConfigService.findOne(name) == null) {
+            
topologyResponse.setErrorMessage(TopologyStatusCode.SENSOR_PARSER_CONFIG_MISSING.toString());
+        } else {
+            topologyResponse = 
createResponse(stormCLIClientWrapper.startParserTopology(name), 
TopologyStatusCode.STARTED, TopologyStatusCode.START_ERROR);
+        }
+        return topologyResponse;
+    }
+
+    @Override
+    public TopologyResponse stopParserTopology(String name, boolean stopNow) 
throws RestException {
+        return createResponse(stormCLIClientWrapper.stopParserTopology(name, 
stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
+    }
+
+    @Override
+    public TopologyResponse startEnrichmentTopology() throws RestException {
+        return createResponse(stormCLIClientWrapper.startEnrichmentTopology(), 
TopologyStatusCode.STARTED, TopologyStatusCode.START_ERROR);
+    }
+
+    @Override
+    public TopologyResponse stopEnrichmentTopology(boolean stopNow) throws 
RestException {
+        return 
createResponse(stormCLIClientWrapper.stopEnrichmentTopology(stopNow), 
TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
+    }
+
+    @Override
+    public TopologyResponse startIndexingTopology() throws RestException {
+        return createResponse(stormCLIClientWrapper.startIndexingTopology(), 
TopologyStatusCode.STARTED, TopologyStatusCode.START_ERROR);
+    }
+
+    @Override
+    public TopologyResponse stopIndexingTopology(boolean stopNow) throws 
RestException {
+        return 
createResponse(stormCLIClientWrapper.stopIndexingTopology(stopNow), 
TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
+    }
+
+    private TopologyResponse createResponse(int responseCode, 
TopologyStatusCode successMessage, TopologyStatusCode errorMessage) {
+        TopologyResponse topologyResponse = new TopologyResponse();
+        if (responseCode == 0) {
+            topologyResponse.setSuccessMessage(successMessage.toString());
+        } else {
+            topologyResponse.setErrorMessage(errorMessage.toString());
+        }
+        return topologyResponse;
+    }
+
+    @Override
+    public Map<String, String> getStormClientStatus() throws RestException {
+        return stormCLIClientWrapper.getStormClientStatus();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
new file mode 100644
index 0000000..a472515
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.metron.rest.MetronRestConstants;
+import org.apache.metron.rest.RestException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.metron.rest.MetronRestConstants.ENRICHMENT_TOPOLOGY_NAME;
+import static 
org.apache.metron.rest.MetronRestConstants.INDEXING_TOPOLOGY_NAME;
+
+public class StormCLIWrapper {
+
+  @Autowired
+  private Environment environment;
+
+  public int startParserTopology(String name) throws RestException {
+    return runCommand(getParserStartCommand(name));
+  }
+
+  public int stopParserTopology(String name, boolean stopNow) throws 
RestException {
+    return runCommand(getStopCommand(name, stopNow));
+  }
+
+  public int startEnrichmentTopology() throws RestException {
+    return runCommand(getEnrichmentStartCommand());
+  }
+
+  public int stopEnrichmentTopology(boolean stopNow) throws RestException {
+    return runCommand(getStopCommand(ENRICHMENT_TOPOLOGY_NAME, stopNow));
+  }
+
+  public int startIndexingTopology() throws RestException {
+    return runCommand(getIndexingStartCommand());
+  }
+
+  public int stopIndexingTopology(boolean stopNow) throws RestException {
+    return runCommand(getStopCommand(INDEXING_TOPOLOGY_NAME, stopNow));
+  }
+
+  protected int runCommand(String[] command) throws RestException {
+    ProcessBuilder pb = getProcessBuilder(command);
+    pb.inheritIO();
+    Process process = null;
+    try {
+      process = pb.start();
+      process.waitFor();
+    } catch (Exception e) {
+      throw new RestException(e);
+    }
+    return process.exitValue();
+  }
+
+  protected String[] getParserStartCommand(String name) {
+    String[] command = new String[7];
+    command[0] = 
environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY);
+    command[1] = "-k";
+    command[2] = 
environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY);
+    command[3] = "-z";
+    command[4] = 
environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY);
+    command[5] = "-s";
+    command[6] = name;
+    return command;
+  }
+
+  protected String[] getEnrichmentStartCommand() {
+    String[] command = new String[1];
+    command[0] = 
environment.getProperty(MetronRestConstants.ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY);
+    return command;
+  }
+
+  protected String[] getIndexingStartCommand() {
+    String[] command = new String[1];
+    command[0] = 
environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY);
+    return command;
+  }
+
+  protected String[] getStopCommand(String name, boolean stopNow) {
+    String[] command;
+    if (stopNow) {
+      command = new String[5];
+      command[3] = "-w";
+      command[4] = "0";
+    } else {
+      command = new String[3];
+    }
+    command[0] = "storm";
+    command[1] = "kill";
+    command[2] = name;
+    return command;
+  }
+
+  protected ProcessBuilder getProcessBuilder(String... command) {
+    return new ProcessBuilder(command);
+  }
+
+  public Map<String, String> getStormClientStatus() throws RestException {
+    Map<String, String> status = new HashMap<>();
+    status.put("parserScriptPath", 
environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY));
+    status.put("enrichmentScriptPath", 
environment.getProperty(MetronRestConstants.ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY));
+    status.put("indexingScriptPath", 
environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY));
+    status.put("stormClientVersionInstalled", stormClientVersionInstalled());
+    return status;
+  }
+
+  protected String stormClientVersionInstalled() throws RestException {
+    String stormClientVersionInstalled = "Storm client is not installed";
+    ProcessBuilder pb = getProcessBuilder("storm", "version");
+    pb.redirectErrorStream(true);
+    Process p;
+    try {
+      p = pb.start();
+    } catch (IOException e) {
+      throw new RestException(e);
+    }
+    BufferedReader reader = new BufferedReader(new 
InputStreamReader(p.getInputStream()));
+    List<String> lines = reader.lines().collect(toList());
+    lines.forEach(System.out::println);
+    if (lines.size() > 1) {
+      stormClientVersionInstalled = lines.get(1).replaceFirst("Storm ", "");
+    }
+    return stormClientVersionInstalled;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java
new file mode 100644
index 0000000..280d72a
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.metron.rest.model.TopologyResponse;
+import org.apache.metron.rest.model.TopologyStatus;
+import org.apache.metron.rest.model.TopologyStatusCode;
+import org.apache.metron.rest.model.TopologySummary;
+import org.apache.metron.rest.service.StormStatusService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.metron.rest.MetronRestConstants.STORM_UI_SPRING_PROPERTY;
+import static org.apache.metron.rest.MetronRestConstants.TOPOLOGY_SUMMARY_URL;
+import static org.apache.metron.rest.MetronRestConstants.TOPOLOGY_URL;
+
+@Service
+public class StormStatusServiceImpl implements StormStatusService {
+
+  @Autowired
+  private Environment environment;
+
+  @Autowired
+  private RestTemplate restTemplate;
+
+  @Override
+  public TopologySummary getTopologySummary() {
+    return restTemplate.getForObject("http://"; + 
environment.getProperty(STORM_UI_SPRING_PROPERTY) + TOPOLOGY_SUMMARY_URL, 
TopologySummary.class);
+  }
+
+  @Override
+  public TopologyStatus getTopologyStatus(String name) {
+    TopologyStatus topologyResponse = null;
+    String id = null;
+    for (TopologyStatus topology : getTopologySummary().getTopologies()) {
+      if (name.equals(topology.getName())) {
+        id = topology.getId();
+        break;
+      }
+    }
+    if (id != null) {
+      topologyResponse = restTemplate.getForObject("http://"; + 
environment.getProperty(STORM_UI_SPRING_PROPERTY) + TOPOLOGY_URL + "/" + id, 
TopologyStatus.class);
+    }
+    return topologyResponse;
+  }
+
+  @Override
+  public List<TopologyStatus> getAllTopologyStatus() {
+    List<TopologyStatus> topologyStatus = new ArrayList<>();
+    for (TopologyStatus topology : getTopologySummary().getTopologies()) {
+      topologyStatus.add(restTemplate.getForObject("http://"; + 
environment.getProperty(STORM_UI_SPRING_PROPERTY) + TOPOLOGY_URL + "/" + 
topology.getId(), TopologyStatus.class));
+    }
+    return topologyStatus;
+  }
+
+  @Override
+  public TopologyResponse activateTopology(String name) {
+    TopologyResponse topologyResponse = new TopologyResponse();
+    String id = null;
+    for (TopologyStatus topology : getTopologySummary().getTopologies()) {
+      if (name.equals(topology.getName())) {
+        id = topology.getId();
+        break;
+      }
+    }
+    if (id != null) {
+      Map result = restTemplate.postForObject("http://"; + 
environment.getProperty(STORM_UI_SPRING_PROPERTY) + TOPOLOGY_URL + "/" + id + 
"/activate", null, Map.class);
+      if("success".equals(result.get("status"))) {
+        
topologyResponse.setSuccessMessage(TopologyStatusCode.ACTIVE.toString());
+      } else {
+        topologyResponse.setErrorMessage((String) result.get("status"));
+      }
+    } else {
+      
topologyResponse.setErrorMessage(TopologyStatusCode.TOPOLOGY_NOT_FOUND.toString());
+    }
+    return topologyResponse;
+  }
+
+  @Override
+  public TopologyResponse deactivateTopology(String name) {
+    TopologyResponse topologyResponse = new TopologyResponse();
+    String id = null;
+    for (TopologyStatus topology : getTopologySummary().getTopologies()) {
+      if (name.equals(topology.getName())) {
+        id = topology.getId();
+        break;
+      }
+    }
+    if (id != null) {
+      Map result = restTemplate.postForObject("http://"; + 
environment.getProperty(STORM_UI_SPRING_PROPERTY) + TOPOLOGY_URL + "/" + id + 
"/deactivate", null, Map.class);
+      if("success".equals(result.get("status"))) {
+        
topologyResponse.setSuccessMessage(TopologyStatusCode.INACTIVE.toString());
+      } else {
+        topologyResponse.setErrorMessage((String) result.get("status"));
+      }
+    } else {
+      
topologyResponse.setErrorMessage(TopologyStatusCode.TOPOLOGY_NOT_FOUND.toString());
+    }
+    return topologyResponse;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/TransformationServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/TransformationServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/TransformationServiceImpl.java
new file mode 100644
index 0000000..b26dc82
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/TransformationServiceImpl.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.common.dsl.StellarFunctionInfo;
+import 
org.apache.metron.common.dsl.functions.resolver.SingletonFunctionResolver;
+import org.apache.metron.common.field.transformation.FieldTransformations;
+import org.apache.metron.common.stellar.StellarProcessor;
+import org.apache.metron.rest.model.StellarFunctionDescription;
+import org.apache.metron.rest.model.TransformationValidation;
+import org.apache.metron.rest.service.TransformationService;
+import org.json.simple.JSONObject;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Service
+public class TransformationServiceImpl implements TransformationService {
+
+    @Override
+    public Map<String, Boolean> validateRules(List<String> rules) {
+        Map<String, Boolean> results = new HashMap<>();
+        StellarProcessor stellarProcessor = new StellarProcessor();
+        for(String rule: rules) {
+            try {
+                boolean result = stellarProcessor.validate(rule, 
Context.EMPTY_CONTEXT());
+                results.put(rule, result);
+            } catch (ParseException e) {
+                results.put(rule, false);
+            }
+        }
+        return results;
+    }
+
+    @Override
+    public Map<String, Object> validateTransformation(TransformationValidation 
transformationValidation) {
+        JSONObject sampleJson = new 
JSONObject(transformationValidation.getSampleData());
+        
transformationValidation.getSensorParserConfig().getFieldTransformations().forEach(fieldTransformer
 -> {
+                    fieldTransformer.transformAndUpdate(sampleJson, 
transformationValidation.getSensorParserConfig().getParserConfig(), 
Context.EMPTY_CONTEXT());
+                }
+        );
+        return sampleJson;
+    }
+
+    @Override
+    public FieldTransformations[] getTransformations() {
+        return FieldTransformations.values();
+    }
+
+    @Override
+    public List<StellarFunctionDescription> getStellarFunctions() {
+        List<StellarFunctionDescription> stellarFunctionDescriptions = new 
ArrayList<>();
+        Iterable<StellarFunctionInfo> stellarFunctionsInfo = 
SingletonFunctionResolver.getInstance().getFunctionInfo();
+        stellarFunctionsInfo.forEach(stellarFunctionInfo -> {
+            stellarFunctionDescriptions.add(new StellarFunctionDescription(
+                    stellarFunctionInfo.getName(),
+                    stellarFunctionInfo.getDescription(),
+                    stellarFunctionInfo.getParams(),
+                    stellarFunctionInfo.getReturns()));
+        });
+        return stellarFunctionDescriptions;
+    }
+
+    @Override
+    public List<StellarFunctionDescription> getSimpleStellarFunctions() {
+      List<StellarFunctionDescription> stellarFunctionDescriptions = 
getStellarFunctions();
+      return 
stellarFunctionDescriptions.stream().filter(stellarFunctionDescription ->
+              stellarFunctionDescription.getParams().length == 1).sorted((o1, 
o2) -> o1.getName().compareTo(o2.getName())).collect(Collectors.toList());
+    }
+
+}

Reply via email to