http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserConfigService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserConfigService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserConfigService.java new file mode 100644 index 0000000..efda639 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserConfigService.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service; + +import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.model.ParseMessageRequest; +import org.json.simple.JSONObject; + +import java.util.Map; + +public interface SensorParserConfigService { + + SensorParserConfig save(SensorParserConfig sensorParserConfig) throws RestException; + + SensorParserConfig findOne(String name) throws RestException; + + Iterable<SensorParserConfig> getAll() throws RestException; + + boolean delete(String name) throws RestException; + + Map<String, String> getAvailableParsers(); + + Map<String, String> reloadAvailableParsers(); + + JSONObject parseMessage(ParseMessageRequest parseMessageRequest) throws RestException; +}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java new file mode 100644 index 0000000..8c1e228 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service; + +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.model.TopologyResponse; + +import java.util.Map; + +public interface StormAdminService { + + TopologyResponse startParserTopology(String name) throws RestException; + + TopologyResponse stopParserTopology(String name, boolean stopNow) throws RestException; + + TopologyResponse startEnrichmentTopology() throws RestException; + + TopologyResponse stopEnrichmentTopology(boolean stopNow) throws RestException; + + TopologyResponse startIndexingTopology() throws RestException; + + TopologyResponse stopIndexingTopology(boolean stopNow) throws RestException; + + Map<String, String> getStormClientStatus() throws RestException; +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormStatusService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormStatusService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormStatusService.java new file mode 100644 index 0000000..76216d0 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormStatusService.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service; + +import org.apache.metron.rest.model.TopologyResponse; +import org.apache.metron.rest.model.TopologyStatus; +import org.apache.metron.rest.model.TopologySummary; + +import java.util.List; + +public interface StormStatusService { + + TopologySummary getTopologySummary(); + + TopologyStatus getTopologyStatus(String name); + + List<TopologyStatus> getAllTopologyStatus(); + + TopologyResponse activateTopology(String name); + + TopologyResponse deactivateTopology(String name); +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/TransformationService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/TransformationService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/TransformationService.java new file mode 100644 index 0000000..d1400c6 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/TransformationService.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service; + +import org.apache.metron.common.field.transformation.FieldTransformations; +import org.apache.metron.rest.model.StellarFunctionDescription; +import org.apache.metron.rest.model.TransformationValidation; + +import java.util.List; +import java.util.Map; + +public interface TransformationService { + + Map<String, Boolean> validateRules(List<String> rules); + + Map<String, Object> validateTransformation(TransformationValidation transformationValidation); + + FieldTransformations[] getTransformations(); + + List<StellarFunctionDescription> getStellarFunctions(); + + List<StellarFunctionDescription> getSimpleStellarFunctions(); + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java new file mode 100644 index 0000000..d7bbb23 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import org.apache.commons.lang3.ArrayUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Map; + +public class DockerStormCLIWrapper extends StormCLIWrapper { + + private Logger LOG = LoggerFactory.getLogger(DockerStormCLIWrapper.class); + + @Autowired + private Environment environment; + + @Override + protected ProcessBuilder getProcessBuilder(String... command) { + String[] dockerCommand = {"docker-compose", "-f", environment.getProperty("docker.compose.path"), "-p", "metron", "exec", "storm"}; + ProcessBuilder pb = new ProcessBuilder(ArrayUtils.addAll(dockerCommand, command)); + Map<String, String> pbEnvironment = pb.environment(); + pbEnvironment.put("METRON_VERSION", environment.getProperty("metron.version")); + setDockerEnvironment(pbEnvironment); + return pb; + } + + protected void setDockerEnvironment(Map<String, String> environmentVariables) { + ProcessBuilder pb = getDockerEnvironmentProcessBuilder(); + try { + Process process = pb.start(); + BufferedReader inputStream = new BufferedReader(new InputStreamReader(process.getInputStream())); + String line; + while((line = inputStream.readLine()) != null) { + if (line.startsWith("export")) { + String[] parts = line.replaceFirst("export ", "").split("="); + environmentVariables.put(parts[0], parts[1].replaceAll("\"", "")); + } + } + process.waitFor(); + } catch (IOException | InterruptedException e) { + LOG.error(e.getMessage(), e); + } + } + + protected ProcessBuilder getDockerEnvironmentProcessBuilder() { + String[] command = {"docker-machine", "env", "metron-machine"}; + return new ProcessBuilder(command); + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java new file mode 100644 index 0000000..54c331a --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.service.GlobalConfigService; +import org.apache.zookeeper.KeeperException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.io.ByteArrayInputStream; +import java.util.Map; + +@Service +public class GlobalConfigServiceImpl implements GlobalConfigService { + + @Autowired + private CuratorFramework client; + + @Override + public Map<String, Object> save(Map<String, Object> globalConfig) throws RestException { + try { + ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig, client); + } catch (Exception e) { + throw new RestException(e); + } + return globalConfig; + } + + @Override + public Map<String, Object> get() throws RestException { + Map<String, Object> globalConfig; + try { + byte[] globalConfigBytes = ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(client); + globalConfig = JSONUtils.INSTANCE.load(new ByteArrayInputStream(globalConfigBytes), new TypeReference<Map<String, Object>>(){}); + } catch (KeeperException.NoNodeException e) { + return null; + } catch (Exception e) { + throw new RestException(e); + } + return globalConfig; + } + + @Override + public boolean delete() throws RestException { + try { + client.delete().forPath(ConfigurationType.GLOBAL.getZookeeperRoot()); + } catch (KeeperException.NoNodeException e) { + return false; + } catch (Exception e) { + throw new RestException(e); + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java new file mode 100644 index 0000000..8fbea13 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import oi.thekraken.grok.api.Grok; +import oi.thekraken.grok.api.Match; +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.model.GrokValidation; +import org.apache.metron.rest.service.GrokService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.io.InputStreamReader; +import java.io.StringReader; +import java.util.Map; + +@Service +public class GrokServiceImpl implements GrokService { + + @Autowired + private Grok commonGrok; + + @Override + public Map<String, String> getCommonGrokPatterns() { + return commonGrok.getPatterns(); + } + + @Override + public GrokValidation validateGrokStatement(GrokValidation grokValidation) throws RestException { + Map<String, Object> results; + try { + Grok grok = new Grok(); + grok.addPatternFromReader(new InputStreamReader(getClass().getResourceAsStream("/patterns/common"))); + grok.addPatternFromReader(new StringReader(grokValidation.getStatement())); + String patternLabel = grokValidation.getStatement().substring(0, grokValidation.getStatement().indexOf(" ")); + String grokPattern = "%{" + patternLabel + "}"; + grok.compile(grokPattern); + Match gm = grok.match(grokValidation.getSampleData()); + gm.captures(); + results = gm.toMap(); + results.remove(patternLabel); + } catch (StringIndexOutOfBoundsException e) { + throw new RestException("A pattern label must be included (ex. PATTERN_LABEL ${PATTERN:field} ...)", e.getCause()); + } catch (Exception e) { + throw new RestException(e); + } + grokValidation.setResults(results); + return grokValidation; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/HdfsServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/HdfsServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/HdfsServiceImpl.java new file mode 100644 index 0000000..c14ec0c --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/HdfsServiceImpl.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.metron.rest.service.HdfsService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +@Service +public class HdfsServiceImpl implements HdfsService { + + @Autowired + private Configuration configuration; + + @Override + public byte[] read(Path path) throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + IOUtils.copyBytes(FileSystem.get(configuration).open(path), byteArrayOutputStream, configuration); + return byteArrayOutputStream.toByteArray(); + } + + @Override + public void write(Path path, byte[] contents) throws IOException { + FSDataOutputStream fsDataOutputStream = FileSystem.get(configuration).create(path, true); + fsDataOutputStream.write(contents); + fsDataOutputStream.close(); + } + + @Override + public FileStatus[] list(Path path) throws IOException { + return FileSystem.get(configuration).listStatus(path); + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + return FileSystem.get(configuration).delete(path, recursive); + } + } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java new file mode 100644 index 0000000..7246c2f --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import kafka.admin.AdminOperationException; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.utils.ZkUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.model.KafkaTopic; +import org.apache.metron.rest.service.KafkaService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +@Service +public class KafkaServiceImpl implements KafkaService { + + @Autowired + private ZkUtils zkUtils; + + @Autowired + private KafkaConsumer<String, String> kafkaConsumer; + + private String offsetTopic = "__consumer_offsets"; + + @Override + public KafkaTopic createTopic(KafkaTopic topic) throws RestException { + if (!listTopics().contains(topic.getName())) { + try { + AdminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(), RackAwareMode.Disabled$.MODULE$); + } catch (AdminOperationException e) { + throw new RestException(e); + } + } + return topic; + } + + @Override + public boolean deleteTopic(String name) { + if (listTopics().contains(name)) { + AdminUtils.deleteTopic(zkUtils, name); + return true; + } else { + return false; + } + } + + @Override + public KafkaTopic getTopic(String name) { + KafkaTopic kafkaTopic = null; + if (listTopics().contains(name)) { + List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(name); + if (partitionInfos.size() > 0) { + PartitionInfo partitionInfo = partitionInfos.get(0); + kafkaTopic = new KafkaTopic(); + kafkaTopic.setName(name); + kafkaTopic.setNumPartitions(partitionInfos.size()); + kafkaTopic.setReplicationFactor(partitionInfo.replicas().length); + } + } + return kafkaTopic; + } + + @Override + public Set<String> listTopics() { + Set<String> topics; + synchronized (this) { + topics = kafkaConsumer.listTopics().keySet(); + topics.remove(offsetTopic); + } + return topics; + } + + @Override + public String getSampleMessage(String topic) { + String message = null; + if (listTopics().contains(topic)) { + synchronized (this) { + kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream().map(partitionInfo -> + new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList())); + for (TopicPartition topicPartition : kafkaConsumer.assignment()) { + long offset = kafkaConsumer.position(topicPartition) - 1; + if (offset >= 0) { + kafkaConsumer.seek(topicPartition, offset); + } + } + ConsumerRecords<String, String> records = kafkaConsumer.poll(100); + Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); + if (iterator.hasNext()) { + ConsumerRecord<String, String> record = iterator.next(); + message = record.value(); + } + kafkaConsumer.unsubscribe(); + } + } + return message; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java new file mode 100644 index 0000000..8b3dbb7 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.service.SensorEnrichmentConfigService; +import org.apache.zookeeper.KeeperException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Service +public class SensorEnrichmentConfigServiceImpl implements SensorEnrichmentConfigService { + + @Autowired + private ObjectMapper objectMapper; + + @Autowired + private CuratorFramework client; + + @Override + public SensorEnrichmentConfig save(String name, SensorEnrichmentConfig sensorEnrichmentConfig) throws RestException { + try { + ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(name, objectMapper.writeValueAsString(sensorEnrichmentConfig).getBytes(), client); + } catch (Exception e) { + throw new RestException(e); + } + return sensorEnrichmentConfig; + } + + @Override + public SensorEnrichmentConfig findOne(String name) throws RestException { + SensorEnrichmentConfig sensorEnrichmentConfig; + try { + sensorEnrichmentConfig = ConfigurationsUtils.readSensorEnrichmentConfigFromZookeeper(name, client); + } catch (KeeperException.NoNodeException e) { + return null; + } catch (Exception e) { + throw new RestException(e); + } + return sensorEnrichmentConfig; + } + + @Override + public Map<String, SensorEnrichmentConfig> getAll() throws RestException { + Map<String, SensorEnrichmentConfig> sensorEnrichmentConfigs = new HashMap<>(); + List<String> sensorNames = getAllTypes(); + for (String name : sensorNames) { + sensorEnrichmentConfigs.put(name, findOne(name)); + } + return sensorEnrichmentConfigs; + } + + @Override + public List<String> getAllTypes() throws RestException { + List<String> types; + try { + types = client.getChildren().forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot()); + } catch (KeeperException.NoNodeException e) { + types = new ArrayList<>(); + } catch (Exception e) { + throw new RestException(e); + } + return types; + } + + @Override + public boolean delete(String name) throws RestException { + try { + client.delete().forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/" + name); + } catch (KeeperException.NoNodeException e) { + return false; + } catch (Exception e) { + throw new RestException(e); + } + return true; + } + + @Override + public List<String> getAvailableEnrichments() { + return new ArrayList<String>() {{ + add("geo"); + add("host"); + add("whois"); + }}; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java new file mode 100644 index 0000000..ab46418 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.service.SensorIndexingConfigService; +import org.apache.zookeeper.KeeperException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Service +public class SensorIndexingConfigServiceImpl implements SensorIndexingConfigService { + + @Autowired + private ObjectMapper objectMapper; + + @Autowired + private CuratorFramework client; + + @Override + public Map<String, Object> save(String name, Map<String, Object> sensorIndexingConfig) throws RestException { + try { + ConfigurationsUtils.writeSensorIndexingConfigToZookeeper(name, objectMapper.writeValueAsString(sensorIndexingConfig).getBytes(), client); + } catch (Exception e) { + throw new RestException(e); + } + return sensorIndexingConfig; + } + + @Override + public Map<String, Object> findOne(String name) throws RestException { + Map<String, Object> sensorIndexingConfig; + try { + byte[] sensorIndexingConfigBytes = ConfigurationsUtils.readSensorIndexingConfigBytesFromZookeeper(name, client); + sensorIndexingConfig = JSONUtils.INSTANCE.load(new ByteArrayInputStream(sensorIndexingConfigBytes), new TypeReference<Map<String, Object>>(){}); + } catch (KeeperException.NoNodeException e) { + return null; + } catch (Exception e) { + throw new RestException(e); + } + return sensorIndexingConfig; + } + + @Override + public Map<String, Map<String, Object>> getAll() throws RestException { + Map<String, Map<String, Object>> sensorIndexingConfigs = new HashMap<>(); + List<String> sensorNames = getAllTypes(); + for (String name : sensorNames) { + sensorIndexingConfigs.put(name, findOne(name)); + } + return sensorIndexingConfigs; + } + + @Override + public List<String> getAllTypes() throws RestException { + List<String> types; + try { + types = client.getChildren().forPath(ConfigurationType.INDEXING.getZookeeperRoot()); + } catch (KeeperException.NoNodeException e) { + types = new ArrayList<>(); + } catch (Exception e) { + throw new RestException(e); + } + return types; + } + + @Override + public boolean delete(String name) throws RestException { + try { + client.delete().forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/" + name); + } catch (KeeperException.NoNodeException e) { + return false; + } catch (Exception e) { + throw new RestException(e); + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java new file mode 100644 index 0000000..cb88708 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.curator.framework.CuratorFramework; +import org.apache.hadoop.fs.Path; +import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.rest.MetronRestConstants; +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.model.ParseMessageRequest; +import org.apache.metron.rest.service.HdfsService; +import org.apache.metron.rest.service.SensorParserConfigService; +import org.apache.zookeeper.KeeperException; +import org.json.simple.JSONObject; +import org.reflections.Reflections; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.stereotype.Service; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.metron.rest.MetronRestConstants.GROK_CLASS_NAME; +import static org.apache.metron.rest.MetronRestConstants.GROK_DEFAULT_PATH_SPRING_PROPERTY; +import static org.apache.metron.rest.MetronRestConstants.GROK_PATH_KEY; +import static org.apache.metron.rest.MetronRestConstants.GROK_PATTERN_LABEL_KEY; +import static org.apache.metron.rest.MetronRestConstants.GROK_STATEMENT_KEY; +import static org.apache.metron.rest.MetronRestConstants.GROK_TEMP_PATH_SPRING_PROPERTY; + +@Service +public class SensorParserConfigServiceImpl implements SensorParserConfigService { + + @Autowired + private Environment environment; + + @Autowired + private ObjectMapper objectMapper; + + private CuratorFramework client; + + @Autowired + public void setClient(CuratorFramework client) { + this.client = client; + } + + @Autowired + private HdfsService hdfsService; + + private Map<String, String> availableParsers; + + @Override + public SensorParserConfig save(SensorParserConfig sensorParserConfig) throws RestException { + String serializedConfig; + if (isGrokConfig(sensorParserConfig)) { + addGrokPathToConfig(sensorParserConfig); + sensorParserConfig.getParserConfig().putIfAbsent(MetronRestConstants.GROK_PATTERN_LABEL_KEY, sensorParserConfig.getSensorTopic().toUpperCase()); + String statement = (String) sensorParserConfig.getParserConfig().remove(MetronRestConstants.GROK_STATEMENT_KEY); + serializedConfig = serialize(sensorParserConfig); + sensorParserConfig.getParserConfig().put(MetronRestConstants.GROK_STATEMENT_KEY, statement); + saveGrokStatement(sensorParserConfig); + } else { + serializedConfig = serialize(sensorParserConfig); + } + try { + ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensorParserConfig.getSensorTopic(), serializedConfig.getBytes(), client); + } catch (Exception e) { + throw new RestException(e); + } + return sensorParserConfig; + } + + private String serialize(SensorParserConfig sensorParserConfig) throws RestException { + String serializedConfig; + try { + serializedConfig = objectMapper.writeValueAsString(sensorParserConfig); + } catch (JsonProcessingException e) { + throw new RestException("Could not serialize SensorParserConfig", "Could not serialize " + sensorParserConfig.toString(), e.getCause()); + } + return serializedConfig; + } + + @Override + public SensorParserConfig findOne(String name) throws RestException { + SensorParserConfig sensorParserConfig; + try { + sensorParserConfig = ConfigurationsUtils.readSensorParserConfigFromZookeeper(name, client); + if (isGrokConfig(sensorParserConfig)) { + addGrokStatementToConfig(sensorParserConfig); + } + } catch (KeeperException.NoNodeException e) { + return null; + } catch (Exception e) { + throw new RestException(e); + } + return sensorParserConfig; + } + + @Override + public Iterable<SensorParserConfig> getAll() throws RestException { + List<SensorParserConfig> sensorParserConfigs = new ArrayList<>(); + List<String> sensorNames = getAllTypes(); + for (String name : sensorNames) { + sensorParserConfigs.add(findOne(name)); + } + return sensorParserConfigs; + } + + @Override + public boolean delete(String name) throws RestException { + try { + client.delete().forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/" + name); + } catch (KeeperException.NoNodeException e) { + return false; + } catch (Exception e) { + throw new RestException(e); + } + return true; + } + + private List<String> getAllTypes() throws RestException { + List<String> types; + try { + types = client.getChildren().forPath(ConfigurationType.PARSER.getZookeeperRoot()); + } catch (KeeperException.NoNodeException e) { + types = new ArrayList<>(); + } catch (Exception e) { + throw new RestException(e); + } + return types; + } + + @Override + public Map<String, String> getAvailableParsers() { + if (availableParsers == null) { + availableParsers = new HashMap<>(); + Set<Class<? extends MessageParser>> parserClasses = getParserClasses(); + parserClasses.forEach(parserClass -> { + if (!"BasicParser".equals(parserClass.getSimpleName())) { + availableParsers.put(parserClass.getSimpleName().replaceAll("Basic|Parser", ""), parserClass.getName()); + } + }); + } + return availableParsers; + } + + @Override + public Map<String, String> reloadAvailableParsers() { + availableParsers = null; + return getAvailableParsers(); + } + + private Set<Class<? extends MessageParser>> getParserClasses() { + Reflections reflections = new Reflections("org.apache.metron.parsers"); + return reflections.getSubTypesOf(MessageParser.class); + } + + @Override + public JSONObject parseMessage(ParseMessageRequest parseMessageRequest) throws RestException { + SensorParserConfig sensorParserConfig = parseMessageRequest.getSensorParserConfig(); + if (sensorParserConfig == null) { + throw new RestException("SensorParserConfig is missing from ParseMessageRequest"); + } else if (sensorParserConfig.getParserClassName() == null) { + throw new RestException("SensorParserConfig must have a parserClassName"); + } else { + MessageParser<JSONObject> parser = null; + try { + parser = (MessageParser<JSONObject>) Class.forName(sensorParserConfig.getParserClassName()).newInstance(); + } catch (Exception e) { + throw new RestException(e.toString(), e.getCause()); + } + if (isGrokConfig(sensorParserConfig)) { + saveTemporaryGrokStatement(sensorParserConfig); + sensorParserConfig.getParserConfig().put(MetronRestConstants.GROK_PATH_KEY, new File(getTemporaryGrokRootPath(), sensorParserConfig.getSensorTopic()).toString()); + } + parser.configure(sensorParserConfig.getParserConfig()); + JSONObject results = parser.parse(parseMessageRequest.getSampleData().getBytes()).get(0); + if (isGrokConfig(sensorParserConfig)) { + deleteTemporaryGrokStatement(sensorParserConfig); + } + return results; + } + } + + private boolean isGrokConfig(SensorParserConfig sensorParserConfig) { + return GROK_CLASS_NAME.equals(sensorParserConfig.getParserClassName()); + } + + private void addGrokStatementToConfig(SensorParserConfig sensorParserConfig) throws RestException { + String grokStatement = ""; + String grokPath = (String) sensorParserConfig.getParserConfig().get(GROK_PATH_KEY); + if (grokPath != null) { + String fullGrokStatement = getGrokStatement(grokPath); + String patternLabel = (String) sensorParserConfig.getParserConfig().get(GROK_PATTERN_LABEL_KEY); + grokStatement = fullGrokStatement.replaceFirst(patternLabel + " ", ""); + } + sensorParserConfig.getParserConfig().put(GROK_STATEMENT_KEY, grokStatement); + } + + private void addGrokPathToConfig(SensorParserConfig sensorParserConfig) { + if (sensorParserConfig.getParserConfig().get(GROK_PATH_KEY) == null) { + String grokStatement = (String) sensorParserConfig.getParserConfig().get(GROK_STATEMENT_KEY); + if (grokStatement != null) { + sensorParserConfig.getParserConfig().put(GROK_PATH_KEY, + new Path(environment.getProperty(GROK_DEFAULT_PATH_SPRING_PROPERTY), sensorParserConfig.getSensorTopic()).toString()); + } + } + } + + private String getGrokStatement(String path) throws RestException { + try { + return new String(hdfsService.read(new Path(path))); + } catch (IOException e) { + throw new RestException(e); + } + } + + private void saveGrokStatement(SensorParserConfig sensorParserConfig) throws RestException { + saveGrokStatement(sensorParserConfig, false); + } + + private void saveTemporaryGrokStatement(SensorParserConfig sensorParserConfig) throws RestException { + saveGrokStatement(sensorParserConfig, true); + } + + private void saveGrokStatement(SensorParserConfig sensorParserConfig, boolean isTemporary) throws RestException { + String patternLabel = (String) sensorParserConfig.getParserConfig().get(GROK_PATTERN_LABEL_KEY); + String grokPath = (String) sensorParserConfig.getParserConfig().get(GROK_PATH_KEY); + String grokStatement = (String) sensorParserConfig.getParserConfig().get(GROK_STATEMENT_KEY); + if (grokStatement != null) { + String fullGrokStatement = patternLabel + " " + grokStatement; + try { + if (!isTemporary) { + hdfsService.write(new Path(grokPath), fullGrokStatement.getBytes()); + } else { + File grokDirectory = new File(getTemporaryGrokRootPath()); + if (!grokDirectory.exists()) { + grokDirectory.mkdirs(); + } + FileWriter fileWriter = new FileWriter(new File(grokDirectory, sensorParserConfig.getSensorTopic())); + fileWriter.write(fullGrokStatement); + fileWriter.close(); + } + } catch (IOException e) { + throw new RestException(e); + } + } else { + throw new RestException("A grokStatement must be provided"); + } + } + + private void deleteTemporaryGrokStatement(SensorParserConfig sensorParserConfig) { + File file = new File(getTemporaryGrokRootPath(), sensorParserConfig.getSensorTopic()); + file.delete(); + } + + private String getTemporaryGrokRootPath() { + String grokTempPath = environment.getProperty(GROK_TEMP_PATH_SPRING_PROPERTY); + Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); + return new Path(grokTempPath, authentication.getName()).toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java new file mode 100644 index 0000000..cb7c449 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.model.TopologyResponse; +import org.apache.metron.rest.model.TopologyStatusCode; +import org.apache.metron.rest.service.GlobalConfigService; +import org.apache.metron.rest.service.SensorParserConfigService; +import org.apache.metron.rest.service.StormAdminService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Map; + +@Service +public class StormAdminServiceImpl implements StormAdminService { + + private StormCLIWrapper stormCLIClientWrapper; + + @Autowired + public void setStormCLIClientWrapper(StormCLIWrapper stormCLIClientWrapper) { + this.stormCLIClientWrapper = stormCLIClientWrapper; + } + + @Autowired + private GlobalConfigService globalConfigService; + + @Autowired + private SensorParserConfigService sensorParserConfigService; + + @Override + public TopologyResponse startParserTopology(String name) throws RestException { + TopologyResponse topologyResponse = new TopologyResponse(); + if (globalConfigService.get() == null) { + topologyResponse.setErrorMessage(TopologyStatusCode.GLOBAL_CONFIG_MISSING.toString()); + } else if (sensorParserConfigService.findOne(name) == null) { + topologyResponse.setErrorMessage(TopologyStatusCode.SENSOR_PARSER_CONFIG_MISSING.toString()); + } else { + topologyResponse = createResponse(stormCLIClientWrapper.startParserTopology(name), TopologyStatusCode.STARTED, TopologyStatusCode.START_ERROR); + } + return topologyResponse; + } + + @Override + public TopologyResponse stopParserTopology(String name, boolean stopNow) throws RestException { + return createResponse(stormCLIClientWrapper.stopParserTopology(name, stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR); + } + + @Override + public TopologyResponse startEnrichmentTopology() throws RestException { + return createResponse(stormCLIClientWrapper.startEnrichmentTopology(), TopologyStatusCode.STARTED, TopologyStatusCode.START_ERROR); + } + + @Override + public TopologyResponse stopEnrichmentTopology(boolean stopNow) throws RestException { + return createResponse(stormCLIClientWrapper.stopEnrichmentTopology(stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR); + } + + @Override + public TopologyResponse startIndexingTopology() throws RestException { + return createResponse(stormCLIClientWrapper.startIndexingTopology(), TopologyStatusCode.STARTED, TopologyStatusCode.START_ERROR); + } + + @Override + public TopologyResponse stopIndexingTopology(boolean stopNow) throws RestException { + return createResponse(stormCLIClientWrapper.stopIndexingTopology(stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR); + } + + private TopologyResponse createResponse(int responseCode, TopologyStatusCode successMessage, TopologyStatusCode errorMessage) { + TopologyResponse topologyResponse = new TopologyResponse(); + if (responseCode == 0) { + topologyResponse.setSuccessMessage(successMessage.toString()); + } else { + topologyResponse.setErrorMessage(errorMessage.toString()); + } + return topologyResponse; + } + + @Override + public Map<String, String> getStormClientStatus() throws RestException { + return stormCLIClientWrapper.getStormClientStatus(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java new file mode 100644 index 0000000..a472515 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import org.apache.metron.rest.MetronRestConstants; +import org.apache.metron.rest.RestException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.stream.Collectors.toList; +import static org.apache.metron.rest.MetronRestConstants.ENRICHMENT_TOPOLOGY_NAME; +import static org.apache.metron.rest.MetronRestConstants.INDEXING_TOPOLOGY_NAME; + +public class StormCLIWrapper { + + @Autowired + private Environment environment; + + public int startParserTopology(String name) throws RestException { + return runCommand(getParserStartCommand(name)); + } + + public int stopParserTopology(String name, boolean stopNow) throws RestException { + return runCommand(getStopCommand(name, stopNow)); + } + + public int startEnrichmentTopology() throws RestException { + return runCommand(getEnrichmentStartCommand()); + } + + public int stopEnrichmentTopology(boolean stopNow) throws RestException { + return runCommand(getStopCommand(ENRICHMENT_TOPOLOGY_NAME, stopNow)); + } + + public int startIndexingTopology() throws RestException { + return runCommand(getIndexingStartCommand()); + } + + public int stopIndexingTopology(boolean stopNow) throws RestException { + return runCommand(getStopCommand(INDEXING_TOPOLOGY_NAME, stopNow)); + } + + protected int runCommand(String[] command) throws RestException { + ProcessBuilder pb = getProcessBuilder(command); + pb.inheritIO(); + Process process = null; + try { + process = pb.start(); + process.waitFor(); + } catch (Exception e) { + throw new RestException(e); + } + return process.exitValue(); + } + + protected String[] getParserStartCommand(String name) { + String[] command = new String[7]; + command[0] = environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY); + command[1] = "-k"; + command[2] = environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY); + command[3] = "-z"; + command[4] = environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY); + command[5] = "-s"; + command[6] = name; + return command; + } + + protected String[] getEnrichmentStartCommand() { + String[] command = new String[1]; + command[0] = environment.getProperty(MetronRestConstants.ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY); + return command; + } + + protected String[] getIndexingStartCommand() { + String[] command = new String[1]; + command[0] = environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY); + return command; + } + + protected String[] getStopCommand(String name, boolean stopNow) { + String[] command; + if (stopNow) { + command = new String[5]; + command[3] = "-w"; + command[4] = "0"; + } else { + command = new String[3]; + } + command[0] = "storm"; + command[1] = "kill"; + command[2] = name; + return command; + } + + protected ProcessBuilder getProcessBuilder(String... command) { + return new ProcessBuilder(command); + } + + public Map<String, String> getStormClientStatus() throws RestException { + Map<String, String> status = new HashMap<>(); + status.put("parserScriptPath", environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY)); + status.put("enrichmentScriptPath", environment.getProperty(MetronRestConstants.ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY)); + status.put("indexingScriptPath", environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY)); + status.put("stormClientVersionInstalled", stormClientVersionInstalled()); + return status; + } + + protected String stormClientVersionInstalled() throws RestException { + String stormClientVersionInstalled = "Storm client is not installed"; + ProcessBuilder pb = getProcessBuilder("storm", "version"); + pb.redirectErrorStream(true); + Process p; + try { + p = pb.start(); + } catch (IOException e) { + throw new RestException(e); + } + BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream())); + List<String> lines = reader.lines().collect(toList()); + lines.forEach(System.out::println); + if (lines.size() > 1) { + stormClientVersionInstalled = lines.get(1).replaceFirst("Storm ", ""); + } + return stormClientVersionInstalled; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java new file mode 100644 index 0000000..280d72a --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import org.apache.metron.rest.model.TopologyResponse; +import org.apache.metron.rest.model.TopologyStatus; +import org.apache.metron.rest.model.TopologyStatusCode; +import org.apache.metron.rest.model.TopologySummary; +import org.apache.metron.rest.service.StormStatusService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.metron.rest.MetronRestConstants.STORM_UI_SPRING_PROPERTY; +import static org.apache.metron.rest.MetronRestConstants.TOPOLOGY_SUMMARY_URL; +import static org.apache.metron.rest.MetronRestConstants.TOPOLOGY_URL; + +@Service +public class StormStatusServiceImpl implements StormStatusService { + + @Autowired + private Environment environment; + + @Autowired + private RestTemplate restTemplate; + + @Override + public TopologySummary getTopologySummary() { + return restTemplate.getForObject("http://" + environment.getProperty(STORM_UI_SPRING_PROPERTY) + TOPOLOGY_SUMMARY_URL, TopologySummary.class); + } + + @Override + public TopologyStatus getTopologyStatus(String name) { + TopologyStatus topologyResponse = null; + String id = null; + for (TopologyStatus topology : getTopologySummary().getTopologies()) { + if (name.equals(topology.getName())) { + id = topology.getId(); + break; + } + } + if (id != null) { + topologyResponse = restTemplate.getForObject("http://" + environment.getProperty(STORM_UI_SPRING_PROPERTY) + TOPOLOGY_URL + "/" + id, TopologyStatus.class); + } + return topologyResponse; + } + + @Override + public List<TopologyStatus> getAllTopologyStatus() { + List<TopologyStatus> topologyStatus = new ArrayList<>(); + for (TopologyStatus topology : getTopologySummary().getTopologies()) { + topologyStatus.add(restTemplate.getForObject("http://" + environment.getProperty(STORM_UI_SPRING_PROPERTY) + TOPOLOGY_URL + "/" + topology.getId(), TopologyStatus.class)); + } + return topologyStatus; + } + + @Override + public TopologyResponse activateTopology(String name) { + TopologyResponse topologyResponse = new TopologyResponse(); + String id = null; + for (TopologyStatus topology : getTopologySummary().getTopologies()) { + if (name.equals(topology.getName())) { + id = topology.getId(); + break; + } + } + if (id != null) { + Map result = restTemplate.postForObject("http://" + environment.getProperty(STORM_UI_SPRING_PROPERTY) + TOPOLOGY_URL + "/" + id + "/activate", null, Map.class); + if("success".equals(result.get("status"))) { + topologyResponse.setSuccessMessage(TopologyStatusCode.ACTIVE.toString()); + } else { + topologyResponse.setErrorMessage((String) result.get("status")); + } + } else { + topologyResponse.setErrorMessage(TopologyStatusCode.TOPOLOGY_NOT_FOUND.toString()); + } + return topologyResponse; + } + + @Override + public TopologyResponse deactivateTopology(String name) { + TopologyResponse topologyResponse = new TopologyResponse(); + String id = null; + for (TopologyStatus topology : getTopologySummary().getTopologies()) { + if (name.equals(topology.getName())) { + id = topology.getId(); + break; + } + } + if (id != null) { + Map result = restTemplate.postForObject("http://" + environment.getProperty(STORM_UI_SPRING_PROPERTY) + TOPOLOGY_URL + "/" + id + "/deactivate", null, Map.class); + if("success".equals(result.get("status"))) { + topologyResponse.setSuccessMessage(TopologyStatusCode.INACTIVE.toString()); + } else { + topologyResponse.setErrorMessage((String) result.get("status")); + } + } else { + topologyResponse.setErrorMessage(TopologyStatusCode.TOPOLOGY_NOT_FOUND.toString()); + } + return topologyResponse; + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/55b3e7ea/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/TransformationServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/TransformationServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/TransformationServiceImpl.java new file mode 100644 index 0000000..b26dc82 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/TransformationServiceImpl.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.service.impl; + +import org.apache.metron.common.dsl.Context; +import org.apache.metron.common.dsl.ParseException; +import org.apache.metron.common.dsl.StellarFunctionInfo; +import org.apache.metron.common.dsl.functions.resolver.SingletonFunctionResolver; +import org.apache.metron.common.field.transformation.FieldTransformations; +import org.apache.metron.common.stellar.StellarProcessor; +import org.apache.metron.rest.model.StellarFunctionDescription; +import org.apache.metron.rest.model.TransformationValidation; +import org.apache.metron.rest.service.TransformationService; +import org.json.simple.JSONObject; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Service +public class TransformationServiceImpl implements TransformationService { + + @Override + public Map<String, Boolean> validateRules(List<String> rules) { + Map<String, Boolean> results = new HashMap<>(); + StellarProcessor stellarProcessor = new StellarProcessor(); + for(String rule: rules) { + try { + boolean result = stellarProcessor.validate(rule, Context.EMPTY_CONTEXT()); + results.put(rule, result); + } catch (ParseException e) { + results.put(rule, false); + } + } + return results; + } + + @Override + public Map<String, Object> validateTransformation(TransformationValidation transformationValidation) { + JSONObject sampleJson = new JSONObject(transformationValidation.getSampleData()); + transformationValidation.getSensorParserConfig().getFieldTransformations().forEach(fieldTransformer -> { + fieldTransformer.transformAndUpdate(sampleJson, transformationValidation.getSensorParserConfig().getParserConfig(), Context.EMPTY_CONTEXT()); + } + ); + return sampleJson; + } + + @Override + public FieldTransformations[] getTransformations() { + return FieldTransformations.values(); + } + + @Override + public List<StellarFunctionDescription> getStellarFunctions() { + List<StellarFunctionDescription> stellarFunctionDescriptions = new ArrayList<>(); + Iterable<StellarFunctionInfo> stellarFunctionsInfo = SingletonFunctionResolver.getInstance().getFunctionInfo(); + stellarFunctionsInfo.forEach(stellarFunctionInfo -> { + stellarFunctionDescriptions.add(new StellarFunctionDescription( + stellarFunctionInfo.getName(), + stellarFunctionInfo.getDescription(), + stellarFunctionInfo.getParams(), + stellarFunctionInfo.getReturns())); + }); + return stellarFunctionDescriptions; + } + + @Override + public List<StellarFunctionDescription> getSimpleStellarFunctions() { + List<StellarFunctionDescription> stellarFunctionDescriptions = getStellarFunctions(); + return stellarFunctionDescriptions.stream().filter(stellarFunctionDescription -> + stellarFunctionDescription.getParams().length == 1).sorted((o1, o2) -> o1.getName().compareTo(o2.getName())).collect(Collectors.toList()); + } + +}