http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testStreamRouterBoltSpec.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testStreamRouterBoltSpec.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testStreamRouterBoltSpec.json deleted file mode 100644 index f4e72bf..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testStreamRouterBoltSpec.json +++ /dev/null @@ -1,123 +0,0 @@ -{ - "version": null, - "topologyName": "testTopology", - "routerSpecs": [ - { - "streamId": "testTopic3Stream", - "partition": { - "streamId": "testTopic3Stream", - "type": "GROUPBY", - "columns": [ - "value" - ], - "sortSpec": { - "windowPeriod": "PT10S", - "windowMargin": 1000 - } - }, - "targetQueue": [ - { - "partition": { - "streamId": "testTopic3Stream", - "type": "GROUPBY", - "columns": [ - "value" - ], - "sortSpec": { - "windowPeriod": "PT10S", - "windowMargin": 1000 - } - }, - "workers": [ - { - "topologyName": "testTopology", - "boltId": "alertBolt0" - }, - { - "topologyName": "testTopology", - "boltId": "alertBolt1" - } - ] - } - ] - }, - { - "streamId": "testTopic4Stream", - "partition": { - "streamId": "testTopic4Stream", - "type": "GROUPBY", - "columns": [ - "value" - ], - "sortSpec": { - "windowPeriod": "PT10S", - "windowMargin": 1000 - } - }, - "targetQueue": [ - { - "partition": { - "streamId": "testTopic4Stream", - "type": "GROUPBY", - "columns": [ - "value" - ], - "sortSpec": { - "windowPeriod": "PT10S", - "windowMargin": 1000 - } - }, - "workers": [ - { - "topologyName": "testTopology", - "boltId": "alertBolt0" - }, - { - "topologyName": "testTopology", - "boltId": "alertBolt1" - } - ] - } - ] - }, - { - "streamId": "testTopic5Stream", - "partition": { - "streamId": "testTopic5Stream", - "type": "GROUPBY", - "columns": [ - "value" - ], - "sortSpec": { - "windowPeriod": "PT10S", - "windowMargin": 1000 - } - }, - "targetQueue": [ - { - "partition": { - "streamId": "testTopic5Stream", - "type": "GROUPBY", - "columns": [ - "value" - ], - "sortSpec": { - "windowPeriod": "PT10S", - "windowMargin": 1000 - } - }, - "workers": [ - { - "topologyName": "testTopology", - "boltId": "alertBolt0" - }, - { - "topologyName": "testTopology", - "boltId": "alertBolt1" - } - ] - } - ] - } - ] -}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/topic.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/topic.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/topic.json deleted file mode 100644 index b49d6ad..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/topic.json +++ /dev/null @@ -1 +0,0 @@ -nn_jmx_metric_sandbox \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/.gitignore ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/.gitignore b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/.gitignore deleted file mode 100644 index 1dd3331..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/target/ -/target/ http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml deleted file mode 100644 index 26613d5..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml +++ /dev/null @@ -1,125 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- ~ /* ~ * Licensed to the Apache Software Foundation (ASF) under one - or more ~ * contributor license agreements. See the NOTICE file distributed - with ~ * this work for additional information regarding copyright ownership. - ~ * The ASF licenses this file to You under the Apache License, Version 2.0 - ~ * (the "License"); you may not use this file except in compliance with - ~ * the License. You may obtain a copy of the License at ~ * ~ * http://www.apache.org/licenses/LICENSE-2.0 - ~ * ~ * Unless required by applicable law or agreed to in writing, software - ~ * distributed under the License is distributed on an "AS IS" BASIS, ~ * - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ * See the License for the specific language governing permissions and ~ - * limitations under the License. ~ */ --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.eagle</groupId> - <artifactId>alert-metadata-parent</artifactId> - <version>0.5.0-SNAPSHOT</version> - </parent> - - <artifactId>alert-metadata-service</artifactId> - <name>Eagle::Core::Alert::MetadataService</name> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.eagle</groupId> - <artifactId>alert-engine</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.ow2.asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.eagle</groupId> - <artifactId>alert-metadata</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-server</artifactId> - </dependency> - <dependency> - <groupId>com.sun.jersey.contribs</groupId> - <artifactId>jersey-multipart</artifactId> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-servlet</artifactId> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-client</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - <dependency> - <groupId>com.typesafe</groupId> - <artifactId>config</artifactId> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.tomcat.embed</groupId> - <artifactId>tomcat-embed-core</artifactId> - </dependency> - <dependency> - <groupId>io.swagger</groupId> - <artifactId>swagger-jaxrs</artifactId> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-module-junit4</artifactId> - <version>${powermock.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-api-mockito</artifactId> - <version>${powermock.version}</version> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-maven-plugin</artifactId> - <configuration> - <scanIntervalSeconds>5</scanIntervalSeconds> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - <version>2.1.2</version> - <executions> - <execution> - <id>attach-sources</id> - <phase>verify</phase> - <goals> - <goal>jar-no-fork</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java deleted file mode 100644 index 36a3044..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java +++ /dev/null @@ -1,525 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.service.metadata.resource; - -import com.google.common.base.Preconditions; -import com.google.inject.Inject; -import org.apache.commons.lang3.StringUtils; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.*; -import org.apache.eagle.alert.engine.interpreter.PolicyInterpreter; -import org.apache.eagle.alert.engine.interpreter.PolicyParseResult; -import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.apache.eagle.alert.engine.publisher.PublishementTypeLoader; -import org.apache.eagle.alert.metadata.IMetadataDao; -import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory; -import org.apache.eagle.alert.metadata.resource.Models; -import org.apache.eagle.alert.metadata.resource.OpResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.validation.Valid; -import javax.ws.rs.*; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * @since Apr 11, 2016. - */ -@Path("/metadata") -@Produces("application/json") -@Consumes("application/json") -public class MetadataResource { - - private static final Logger LOG = LoggerFactory.getLogger(MetadataResource.class); - - // private IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao(); - private final IMetadataDao dao; - - public MetadataResource() { - this.dao = MetadataDaoFactory.getInstance().getMetadataDao(); - } - - @Inject - public MetadataResource(IMetadataDao dao) { - this.dao = dao; - } - - @Path("/clusters") - @GET - public List<StreamingCluster> listClusters() { - return dao.listClusters(); - } - - @Path("/clear") - @POST - public OpResult clear() { - return dao.clear(); - } - - @Path("/clear/schedulestates") - @POST - public OpResult clearScheduleStates(int capacity) { - return dao.clearScheduleState(capacity); - } - - @Path("/export") - @POST - public Models export() { - return dao.export(); - } - - @Path("/import") - @POST - public OpResult importModels(Models model) { - return dao.importModels(model); - } - - @Path("/clusters") - @POST - public OpResult addCluster(StreamingCluster cluster) { - return dao.addCluster(cluster); - } - - @Path("/clusters/batch") - @POST - public List<OpResult> addClusters(List<StreamingCluster> clusters) { - List<OpResult> results = new LinkedList<>(); - for (StreamingCluster cluster : clusters) { - results.add(dao.addCluster(cluster)); - } - return results; - } - - @Path("/clusters/{clusterId}") - @DELETE - public OpResult removeCluster(@PathParam("clusterId") String clusterId) { - return dao.removeCluster(clusterId); - } - - @Path("/clusters") - @DELETE - public List<OpResult> removeClusters(List<String> clusterIds) { - List<OpResult> results = new LinkedList<>(); - for (String cluster : clusterIds) { - results.add(dao.removeCluster(cluster)); - } - return results; - } - - @Path("/streams") - @GET - public List<StreamDefinition> listStreams(@QueryParam("siteId") String siteId) { - if (siteId == null) { - return dao.listStreams(); - } else { - return dao.listStreams().stream() - .filter((streamDefinition -> streamDefinition.getSiteId().equals(siteId))) - .collect(Collectors.toList()); - } - } - - @Path("/streams") - @POST - public OpResult createStream(StreamDefinition stream) { - return dao.createStream(stream); - } - - @Path("/streams/create") - @POST - public OpResult createStream(StreamDefinitionWrapper stream) { - Preconditions.checkNotNull(stream.getStreamDefinition(),"Stream definition is null"); - Preconditions.checkNotNull(stream.getStreamSource(),"Stream source is null"); - stream.validateAndEnsureDefault(); - OpResult createStreamResult = dao.createStream(stream.getStreamDefinition()); - OpResult createDataSourceResult = dao.addDataSource(stream.getStreamSource()); - // TODO: Check kafka topic exist or not. - if (createStreamResult.code == OpResult.SUCCESS - && createDataSourceResult.code == OpResult.SUCCESS) { - return OpResult.success("Successfully create stream " - + stream.getStreamDefinition().getStreamId() - + ", and datasource " - + stream.getStreamSource().getName()); - } else { - return OpResult.fail("Error: " - + StringUtils.join(new String[]{createDataSourceResult.message, createDataSourceResult.message},",")); - } - } - - @Path("/streams/batch") - @POST - public List<OpResult> addStreams(List<StreamDefinition> streams) { - List<OpResult> results = new LinkedList<>(); - for (StreamDefinition stream : streams) { - results.add(dao.createStream(stream)); - } - return results; - } - - @Path("/streams/{streamId}") - @DELETE - public OpResult removeStream(@PathParam("streamId") String streamId) { - return dao.removeStream(streamId); - } - - @Path("/streams") - @DELETE - public List<OpResult> removeStreams(List<String> streamIds) { - List<OpResult> results = new LinkedList<>(); - for (String streamId : streamIds) { - results.add(dao.removeStream(streamId)); - } - return results; - } - - @Path("/datasources") - @GET - public List<Kafka2TupleMetadata> listDataSources() { - return dao.listDataSources(); - } - - @Path("/datasources") - @POST - public OpResult addDataSource(Kafka2TupleMetadata dataSource) { - return dao.addDataSource(dataSource); - } - - @Path("/datasources/batch") - @POST - public List<OpResult> addDataSources(List<Kafka2TupleMetadata> datasources) { - List<OpResult> results = new LinkedList<>(); - for (Kafka2TupleMetadata ds : datasources) { - results.add(dao.addDataSource(ds)); - } - return results; - } - - @Path("/datasources/{datasourceId}") - @DELETE - public OpResult removeDataSource(@PathParam("datasourceId") String datasourceId) { - return dao.removeDataSource(datasourceId); - } - - @Path("/datasources") - @DELETE - public List<OpResult> removeDataSources(List<String> datasourceIds) { - List<OpResult> results = new LinkedList<>(); - for (String ds : datasourceIds) { - results.add(dao.removeDataSource(ds)); - } - return results; - } - - @Path("/policies") - @GET - public List<PolicyDefinition> listPolicies(@QueryParam("siteId") String siteId) { - if (siteId != null) { - return dao.getPoliciesBySiteId(siteId); - } else { - return dao.listPolicies(); - } - } - - @Path("/policies") - @POST - public OpResult addPolicy(@Valid PolicyDefinition policy) { - PolicyValidationResult validationResult = this.validatePolicy(policy); - if (validationResult.isSuccess()) { - return dao.addPolicy(policy); - } else { - return OpResult.fail(validationResult.getMessage()); - } - } - - @Path("/policies/validate") - @POST - public PolicyValidationResult validatePolicy(PolicyDefinition policy) { - Map<String, StreamDefinition> allDefinitions = new HashMap<>(); - for (StreamDefinition definition : dao.listStreams()) { - allDefinitions.put(definition.getStreamId(), definition); - } - return PolicyInterpreter.validate(policy, allDefinitions); - } - - @Path("/policies/parse") - @POST - public PolicyParseResult parsePolicy(String policyDefinition) { - return PolicyInterpreter.parse(policyDefinition); - } - - @Path("/policies/batch") - @POST - public List<OpResult> addPolicies(List<PolicyDefinition> policies) { - List<OpResult> results = new LinkedList<>(); - for (PolicyDefinition policy : policies) { - results.add(dao.addPolicy(policy)); - } - return results; - } - - @Path("/policies/{policyId}") - @DELETE - public OpResult removePolicy(@PathParam("policyId") String policyId) { - return dao.removePolicy(policyId); - } - - @Path("/policies/{policyId}/alerts") - @GET - public List<AlertPublishEvent> getAlertPublishEventByPolicyId(@PathParam("policyId") String policyId, - @QueryParam("size") int size) { - return dao.getAlertPublishEventsByPolicyId(policyId, size); - } - - @Path("/policies/{policyId}/publishments") - @GET - public List<Publishment> getPolicyPublishments(@PathParam("policyId") String policyId) { - return dao.getPublishmentsByPolicyId(policyId); - } - - @Path("/policies/{policyId}/publishments") - @POST - public OpResult addPublishmentsToPolicy(@PathParam("policyId") String policyId, List<String> publishmentIds) { - return dao.addPublishmentsToPolicy(policyId, publishmentIds); - } - - @Path("/policies/{policyId}") - @GET - public PolicyDefinition getPolicyById(@PathParam("policyId") String policyId) { - Preconditions.checkNotNull(policyId, "policyId"); - return dao.getPolicyById(policyId); - } - - @Path("/policies/{policyId}/status/{status}") - @POST - public OpResult updatePolicyStatusByID(@PathParam("policyId") String policyId, @PathParam("status") PolicyDefinition.PolicyStatus status) { - OpResult result = new OpResult(); - try { - PolicyDefinition policyDefinition = getPolicyById(policyId); - policyDefinition.setPolicyStatus(status); - OpResult updateResult = addPolicy(policyDefinition); - result.code = updateResult.code; - - if (result.code == OpResult.SUCCESS) { - result.message = "Successfully updated status of " + policyId + " as " + status; - LOG.info(result.message); - } else { - result.message = updateResult.message; - LOG.error(result.message); - } - } catch (Exception e) { - LOG.error("Error: " + e.getMessage(), e); - result.code = OpResult.FAILURE; - result.message = e.getMessage(); - } - return result; - } - - @Path("/policies") - @DELETE - public List<OpResult> removePolicies(List<String> policies) { - List<OpResult> results = new LinkedList<>(); - for (String policy : policies) { - results.add(dao.removePolicy(policy)); - } - return results; - } - - @Path("/publishments") - @GET - public List<Publishment> listPublishment() { - return dao.listPublishment(); - } - - @Path("/publishments") - @POST - public OpResult addPublishment(Publishment publishment) { - return dao.addPublishment(publishment); - } - - @Path("/publishments/batch") - @POST - public List<OpResult> addPublishments(List<Publishment> publishments) { - List<OpResult> results = new LinkedList<>(); - for (Publishment publishment : publishments) { - results.add(dao.addPublishment(publishment)); - } - return results; - } - - @Path("/publishments/{name}") - @DELETE - public OpResult removePublishment(@PathParam("name") String pubId) { - return dao.removePublishment(pubId); - } - - @Path("/publishments") - @DELETE - public List<OpResult> removePublishments(List<String> pubIds) { - List<OpResult> results = new LinkedList<>(); - for (String pub : pubIds) { - results.add(dao.removePublishment(pub)); - } - return results; - } - - @Path("/publishmentTypes") - @GET - public List<PublishmentType> listPublishmentType() { - return PublishementTypeLoader.loadPublishmentTypes(); - } - - @Path("/publishmentTypes") - @POST - @Deprecated - public OpResult addPublishmentType(PublishmentType publishmentType) { - return dao.addPublishmentType(publishmentType); - } - - @Path("/publishmentTypes/batch") - @POST - @Deprecated - public List<OpResult> addPublishmentTypes(List<PublishmentType> publishmentTypes) { - List<OpResult> results = new LinkedList<>(); - for (PublishmentType pubType : publishmentTypes) { - results.add(dao.addPublishmentType(pubType)); - } - return results; - } - - @Path("/publishmentTypes/{name}") - @DELETE - @Deprecated - public OpResult removePublishmentType(@PathParam("name") String name) { - return dao.removePublishmentType(name); - } - - @Path("/publishmentTypes") - @DELETE - @Deprecated - public List<OpResult> removePublishmentTypes(List<String> pubTypes) { - List<OpResult> results = new LinkedList<>(); - for (String pubType : pubTypes) { - results.add(dao.removePublishmentType(pubType)); - } - return results; - } - - @Path("/schedulestates/{versionId}") - @GET - public ScheduleState listScheduleState(@PathParam("versionId") String versionId) { - return dao.getScheduleState(versionId); - } - - @Path("/schedulestates") - @GET - public ScheduleState latestScheduleState() { - return dao.getScheduleState(); - } - - @Path("/schedulestates") - @POST - public OpResult addScheduleState(ScheduleState state) { - return dao.addScheduleState(state); - } - - @Path("/assignments") - @GET - public List<PolicyAssignment> listAssignmenets() { - return dao.listAssignments(); - } - - @Path("/assignments") - @POST - public OpResult addAssignmenet(PolicyAssignment pa) { - return dao.addAssignment(pa); - } - - @Path("/topologies") - @GET - public List<Topology> listTopologies() { - return dao.listTopologies(); - } - - @Path("/topologies") - @POST - public OpResult addTopology(Topology t) { - return dao.addTopology(t); - } - - @Path("/topologies/batch") - @POST - public List<OpResult> addTopologies(List<Topology> topologies) { - List<OpResult> results = new LinkedList<>(); - for (Topology t : topologies) { - results.add(dao.addTopology(t)); - } - return results; - } - - @Path("/alerts") - @POST - public OpResult addAlertPublishEvent(AlertPublishEvent event) { - return dao.addAlertPublishEvent(event); - } - - @Path("/alerts/batch") - @POST - public List<OpResult> addAlertPublishEvents(List<AlertPublishEvent> events) { - List<OpResult> results = new LinkedList<>(); - for (AlertPublishEvent e : events) { - results.add(dao.addAlertPublishEvent(e)); - } - return results; - } - - @Path("/alerts") - @GET - public List<AlertPublishEvent> listAlertPublishEvents(@QueryParam("size") int size) { - return dao.listAlertPublishEvent(size); - } - - @Path("/alerts/{alertId}") - @GET - public AlertPublishEvent getAlertPublishEvent(@PathParam("alertId") String alertId) { - return dao.getAlertPublishEvent(alertId); - } - - @Path("/topologies/{topologyName}") - @DELETE - public OpResult removeTopology(@PathParam("topologyName") String topologyName) { - return dao.removeTopology(topologyName); - } - - @Path("/topologies") - @DELETE - public List<OpResult> removeTopologies(List<String> topologies) { - List<OpResult> results = new LinkedList<>(); - for (String t : topologies) { - results.add(dao.removeTopology(t)); - } - return results; - } - - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java deleted file mode 100644 index 738c978..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/StreamDefinitionWrapper.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.service.metadata.resource; - -import com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector; - -import java.util.Properties; - -public class StreamDefinitionWrapper { - private Kafka2TupleMetadata streamSource; - private StreamDefinition streamDefinition; - - public Kafka2TupleMetadata getStreamSource() { - return streamSource; - } - - public void setStreamSource(Kafka2TupleMetadata streamSource) { - this.streamSource = streamSource; - } - - public StreamDefinition getStreamDefinition() { - return streamDefinition; - } - - public void setStreamDefinition(StreamDefinition streamDefinition) { - this.streamDefinition = streamDefinition; - } - - public void validateAndEnsureDefault() { - Preconditions.checkNotNull(streamSource); - Preconditions.checkNotNull(streamDefinition); - if (streamSource.getType() == null) { - streamSource.setType("KAFKA"); - } - String dataSourceName = (getStreamDefinition().getStreamId() + "_CUSTOMIZED").toUpperCase(); - getStreamDefinition().setDataSource(dataSourceName); - getStreamSource().setName(dataSourceName); - Tuple2StreamMetadata codec = new Tuple2StreamMetadata(); - codec.setTimestampColumn("timestamp"); - codec.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getName()); - Properties streamNameSelectorProp = new Properties(); - streamNameSelectorProp.put("userProvidedStreamName", streamSource.getName()); - codec.setStreamNameSelectorProp(streamNameSelectorProp); - if (StringUtils.isBlank(codec.getStreamNameSelectorCls())) { - codec.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getName()); - } - if (StringUtils.isBlank(codec.getTimestampFormat())) { - codec.setTimestampFormat(null); - } - this.streamSource.setCodec(codec); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/TopologyMgmtResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/TopologyMgmtResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/TopologyMgmtResource.java deleted file mode 100644 index 5626321..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/TopologyMgmtResource.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.service.topology.resource; - -import org.apache.eagle.alert.metadata.resource.OpResult; -import org.apache.eagle.service.topology.resource.impl.TopologyMgmtResourceImpl; -import org.apache.eagle.service.topology.resource.impl.TopologyStatus; - -import java.util.List; -import javax.ws.rs.*; - -/** - * @since May 5, 2016. - */ -@Path("/alert") -@Produces("application/json") -@Consumes("application/json") -public class TopologyMgmtResource { - private TopologyMgmtResourceImpl topologyManager = new TopologyMgmtResourceImpl(); - - @POST - @Path("/topologies/{topologyName}/start") - public OpResult startTopology(@PathParam("topologyName") String topologyName) { - OpResult result = new OpResult(); - try { - topologyManager.startTopology(topologyName); - } catch (Exception ex) { - result.message = ex.toString(); - } - return result; - } - - @POST - @Path("/topologies/{topologyName}/stop") - public OpResult stopTopology(@PathParam("topologyName") String topologyName) { - OpResult result = new OpResult(); - try { - topologyManager.stopTopology(topologyName); - } catch (Exception ex) { - result.message = ex.toString(); - } - return result; - } - - @GET - @Path("/topologies") - public List<TopologyStatus> getTopologies() throws Exception { - return topologyManager.getTopologies(); - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceHelper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceHelper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceHelper.java deleted file mode 100644 index 3c6f5a5..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceHelper.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.service.topology.resource.impl; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.List; -import java.util.Optional; - -public class TopologyMgmtResourceHelper { - private static final Logger LOG = LoggerFactory.getLogger(TopologyMgmtResourceHelper.class); - - public static <T> Optional<T> findById(List<T> clusters, String id) { - Optional<T> pptionValue = clusters.stream().filter(o -> getName(o).equalsIgnoreCase(id)).findFirst(); - return pptionValue; - } - - public static <T> String getName(T t) { - try { - Method m = t.getClass().getMethod("getName"); - return (String) m.invoke(t); - } catch (NoSuchMethodException | SecurityException | InvocationTargetException | IllegalAccessException - | IllegalArgumentException e) { - LOG.error(" getName not found on given class :" + t.getClass().getName()); - } - throw new RuntimeException(String.format("no getName() found on target class %s for matching", t.getClass() - .getName())); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java deleted file mode 100644 index 1bf810f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.service.topology.resource.impl; - -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.UnitTopologyMain; -import org.apache.eagle.alert.engine.coordinator.StreamingCluster; -import org.apache.eagle.alert.engine.runner.UnitTopologyRunner; -import org.apache.eagle.alert.metadata.IMetadataDao; -import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory; -import backtype.storm.Config; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.Nimbus; -import backtype.storm.generated.StormTopology; -import backtype.storm.generated.TopologySummary; -import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; -import com.typesafe.config.ConfigFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; - - -public class TopologyMgmtResourceImpl { - private static final IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao(); - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(TopologyMgmtResourceImpl.class); - - private static final String DEFAULT_NIMBUS_HOST = "sandbox.hortonworks.com"; - private static final Integer DEFAULT_NIMBUS_THRIFT_PORT = 6627; - private static final String STORM_JAR_PATH = "topology.stormJarPath"; - - - @SuppressWarnings( {"rawtypes", "unchecked"}) - private Map getStormConf(List<StreamingCluster> clusters, String clusterId) throws Exception { - Map<String, Object> stormConf = Utils.readStormConfig(); - if (clusterId == null) { - stormConf.put(Config.NIMBUS_HOST, DEFAULT_NIMBUS_HOST); - stormConf.put(Config.NIMBUS_THRIFT_PORT, DEFAULT_NIMBUS_THRIFT_PORT); - } else { - if (clusters == null) { - clusters = dao.listClusters(); - } - Optional<StreamingCluster> scOp = TopologyMgmtResourceHelper.findById(clusters, clusterId); - StreamingCluster cluster; - if (scOp.isPresent()) { - cluster = scOp.get(); - } else { - throw new Exception("Fail to find cluster: " + clusterId); - } - stormConf.put(Config.NIMBUS_HOST, cluster.getDeployments().getOrDefault(StreamingCluster.NIMBUS_HOST, DEFAULT_NIMBUS_HOST)); - stormConf.put(Config.NIMBUS_THRIFT_PORT, Integer.valueOf(cluster.getDeployments().get(StreamingCluster.NIMBUS_THRIFT_PORT))); - } - return stormConf; - } - - private void createTopologyHelper(Topology topologyDef, com.typesafe.config.Config config) { - int numOfSpoutTasks = config.getInt(UnitTopologyRunner.SPOUT_TASK_NUM); - int numOfRouterBolts = config.getInt(UnitTopologyRunner.ROUTER_TASK_NUM); - int numOfAlertBolts = config.getInt(UnitTopologyRunner.ALERT_TASK_NUM); - int numOfPublishTasks = config.getInt(UnitTopologyRunner.PUBLISH_TASK_NUM); - topologyDef.setSpoutId(UnitTopologyRunner.spoutName); - topologyDef.setPubBoltId(UnitTopologyRunner.alertPublishBoltName); - topologyDef.setNumOfSpout(numOfSpoutTasks); - topologyDef.setNumOfGroupBolt(numOfRouterBolts); - topologyDef.setNumOfAlertBolt(numOfAlertBolts); - topologyDef.setNumOfPublishBolt(numOfPublishTasks); - dao.addTopology(topologyDef); - } - - private StormTopology createTopology(Topology topologyDef) { - com.typesafe.config.Config topologyConf = ConfigFactory.load("topology-sample-definition.conf"); - String stormJarPath = ""; - if (topologyConf.hasPath(STORM_JAR_PATH)) { - stormJarPath = topologyConf.getString(STORM_JAR_PATH); - } - System.setProperty("storm.jar", stormJarPath); - createTopologyHelper(topologyDef, topologyConf); - return UnitTopologyMain.createTopology(topologyConf); - } - - public void startTopology(String topologyName) throws Exception { - Optional<Topology> tdop = TopologyMgmtResourceHelper.findById(dao.listTopologies(), topologyName); - Topology topologyDef; - if (tdop.isPresent()) { - topologyDef = tdop.get(); - } else { - topologyDef = new Topology(); - topologyDef.setName(topologyName); - } - StormSubmitter.submitTopology(topologyName, getStormConf(null, topologyDef.getClusterName()), createTopology(topologyDef)); - } - - public void stopTopology(String topologyName) throws Exception { - Optional<Topology> tdop = TopologyMgmtResourceHelper.findById(dao.listTopologies(), topologyName); - Topology topologyDef; - if (tdop.isPresent()) { - topologyDef = tdop.get(); - } else { - throw new Exception("Fail to find topology " + topologyName); - } - Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConf(null, topologyDef.getClusterName())).getClient(); - stormClient.killTopology(topologyName); - } - - @SuppressWarnings( {"rawtypes", "unused"}) - private TopologySummary getTopologySummery(List<StreamingCluster> clusters, Topology topologyDef) throws Exception { - Map stormConf = getStormConf(clusters, topologyDef.getClusterName()); - Nimbus.Client stormClient = NimbusClient.getConfiguredClient(stormConf).getClient(); - Optional<TopologySummary> tOp = stormClient.getClusterInfo().get_topologies().stream().filter(topology -> topology.get_name().equalsIgnoreCase(topologyDef.getName())).findFirst(); - if (tOp.isPresent()) { - String id = tOp.get().get_id(); - //StormTopology stormTopology= stormClient.getTopology(id); - return tOp.get(); - } else { - return null; - } - } - - public List<TopologyStatus> getTopologies() throws Exception { - List<Topology> topologyDefinitions = dao.listTopologies(); - List<StreamingCluster> clusters = dao.listClusters(); - - List<TopologyStatus> topologies = new ArrayList<>(); - for (Topology topologyDef : topologyDefinitions) { - TopologySummary topologySummary = getTopologySummery(clusters, topologyDef); - if (topologySummary != null) { - TopologyStatus t = new TopologyStatus(); - t.setName(topologySummary.get_name()); - t.setId(topologySummary.get_id()); - t.setState(topologySummary.get_status()); - t.setTopology(topologyDef); - topologies.add(t); - } - } - return topologies; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyStatus.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyStatus.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyStatus.java deleted file mode 100644 index c3381d4..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyStatus.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.service.topology.resource.impl; - -import org.apache.eagle.alert.coordination.model.internal.Topology; - -import java.util.HashMap; -import java.util.Map; - -public class TopologyStatus { - private String name; - private String id; - private String state; - private Topology topology; - - private Map<String, Double> spoutLoad = new HashMap<>(); - private Map<String, Double> boltLoad = new HashMap<>(); - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public String getState() { - return state; - } - - public void setState(String state) { - this.state = state; - } - - public Map<String, Double> getSpoutLoad() { - return spoutLoad; - } - - public void setSpoutLoad(Map<String, Double> spoutLoad) { - this.spoutLoad = spoutLoad; - } - - public Map<String, Double> getBoltLoad() { - return boltLoad; - } - - public void setBoltLoad(Map<String, Double> boltLoad) { - this.boltLoad = boltLoad; - } - - public Topology getTopology() { - return topology; - } - - public void setTopology(Topology topology) { - this.topology = topology; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java deleted file mode 100644 index b9a7634..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.service.topology.resource.impl; - -import backtype.storm.StormSubmitter; -import backtype.storm.generated.ClusterSummary; -import backtype.storm.generated.Nimbus; -import backtype.storm.generated.StormTopology; -import backtype.storm.generated.TopologySummary; -import backtype.storm.utils.NimbusClient; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.StreamingCluster; -import org.apache.eagle.alert.metadata.IMetadataDao; -import org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl; -import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.powermock.api.mockito.PowerMockito.when; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({TopologyMgmtResourceImpl.class, StormSubmitter.class}) -public class TopologyMgmtResourceImplTest { - TopologyMgmtResourceImpl topologyManager = new TopologyMgmtResourceImpl(); - String topologyName = "testStartTopology"; - - @Ignore - @Test - public void testStartTopology() throws Exception { - topologyManager.startTopology(topologyName); - Thread.sleep(10000); - } - - @Ignore - @Test - public void testStopTopology() throws Exception { - topologyManager.startTopology(topologyName); - Thread.sleep(10000); - topologyManager.stopTopology(topologyName); - } - - @Ignore - @Test - public void testGetTopologies() throws Exception { - topologyManager.startTopology(topologyName); - Thread.sleep(10000); - List<TopologyStatus> topologies = topologyManager.getTopologies(); - Assert.assertTrue(topologies.size() == 1); - } - - @Test - public void testGetTopologies1() throws Exception { - IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao(); - TopologyMgmtResourceImpl service = new TopologyMgmtResourceImpl(); - Field daoField = TopologyMgmtResourceImpl.class.getDeclaredField("dao"); - daoField.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(daoField, daoField.getModifiers() & ~Modifier.FINAL); - daoField.set(null, dao); - // set data - Topology topology = new Topology("test", 1, 1); - StreamingCluster cluster =new StreamingCluster(); - dao.clear(); - dao.addTopology(topology); - dao.addCluster(cluster); - TopologyMgmtResourceImpl spy = PowerMockito.spy(service); - PowerMockito.doReturn(new TopologySummary()).when(spy,"getTopologySummery", Mockito.anyCollection(), Mockito.any(Topology.class)); - Assert.assertEquals(1, spy.getTopologies().size()); - } - - @Test - public void testStartTopology1() throws Exception { - IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao(); - TopologyMgmtResourceImpl service = new TopologyMgmtResourceImpl(); - Field daoField = TopologyMgmtResourceImpl.class.getDeclaredField("dao"); - daoField.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(daoField, daoField.getModifiers() & ~Modifier.FINAL); - daoField.set(null, dao); - // set data - Topology topology = new Topology("test", 1, 1); - StreamingCluster cluster =new StreamingCluster(); - dao.clear(); - dao.addTopology(topology); - dao.addCluster(cluster); - PowerMockito.mockStatic(StormSubmitter.class); - PowerMockito.doNothing().when(StormSubmitter.class, "submitTopology",Mockito.eq("test"), Mockito.anyMap(), Mockito.any(StormTopology.class)); - TopologyMgmtResourceImpl spy = PowerMockito.spy(service); - PowerMockito.doReturn(null).when(spy,"createTopology", Mockito.any(Topology.class)); - spy.startTopology("test"); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf deleted file mode 100644 index 1b6a281..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -metadata { - metadataDao = org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl - jdbc { - url = "localhost:27017" - } - properties { - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/.gitignore ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/.gitignore b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/.gitignore deleted file mode 100644 index 1dd3331..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/target/ -/target/ http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml deleted file mode 100644 index 8c2f72f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml +++ /dev/null @@ -1,102 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- ~ /* ~ * Licensed to the Apache Software Foundation (ASF) under one - or more ~ * contributor license agreements. See the NOTICE file distributed - with ~ * this work for additional information regarding copyright ownership. - ~ * The ASF licenses this file to You under the Apache License, Version 2.0 - ~ * (the "License"); you may not use this file except in compliance with - ~ * the License. You may obtain a copy of the License at ~ * ~ * http://www.apache.org/licenses/LICENSE-2.0 - ~ * ~ * Unless required by applicable law or agreed to in writing, software - ~ * distributed under the License is distributed on an "AS IS" BASIS, ~ * - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ * See the License for the specific language governing permissions and ~ - * limitations under the License. ~ */ --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.eagle</groupId> - <artifactId>alert-metadata-parent</artifactId> - <version>0.5.0-SNAPSHOT</version> - </parent> - - <artifactId>alert-metadata</artifactId> - <name>Eagle::Core::Alert::MetadataBase</name> - <packaging>jar</packaging> - - <dependencies> - <!-- Storm depends on org.ow2.asm:asm:4.0 --> - <!-- Jersey depends on asm:asm:3.0 --> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.eagle</groupId> - <artifactId>alert-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.mongodb</groupId> - <artifactId>mongo-java-driver</artifactId> - <version>${mongodb.version}</version> - </dependency> - <dependency> - <groupId>com.h2database</groupId> - <artifactId>h2</artifactId> - </dependency> - <dependency> - <groupId>mysql</groupId> - <artifactId>mysql-connector-java</artifactId> - <version>${mysql-connector-java.version}</version> - </dependency> - <dependency> - <groupId>org.apache.ddlutils</groupId> - <artifactId>ddlutils</artifactId> - <version>${ddlutils.version}</version> - </dependency> - <dependency> - <groupId>de.flapdoodle.embed</groupId> - <artifactId>de.flapdoodle.embed.mongo</artifactId> - <version>1.50.5</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.google.inject</groupId> - <artifactId>guice</artifactId> - <version>3.0</version> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-module-junit4</artifactId> - <version>${powermock.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-api-mockito</artifactId> - <version>${powermock.version}</version> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - <version>2.1.2</version> - <executions> - <execution> - <id>attach-sources</id> - <phase>verify</phase> - <goals> - <goal>jar-no-fork</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java deleted file mode 100644 index 2d2a90f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.metadata; - -import com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.*; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.apache.eagle.alert.metadata.resource.Models; -import org.apache.eagle.alert.metadata.resource.OpResult; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -public interface IMetadataDao extends Closeable { - - List<Topology> listTopologies(); - - OpResult addTopology(Topology t); - - OpResult removeTopology(String topologyName); - - List<StreamingCluster> listClusters(); - - OpResult addCluster(StreamingCluster cluster); - - OpResult removeCluster(String clusterId); - - List<StreamDefinition> listStreams(); - - OpResult createStream(StreamDefinition stream); - - OpResult removeStream(String streamId); - - List<Kafka2TupleMetadata> listDataSources(); - - OpResult addDataSource(Kafka2TupleMetadata dataSource); - - OpResult removeDataSource(String datasourceId); - - List<PolicyDefinition> listPolicies(); - - OpResult addPolicy(PolicyDefinition policy); - - OpResult removePolicy(String policyId); - - List<Publishment> listPublishment(); - - OpResult addPublishment(Publishment publishment); - - OpResult removePublishment(String pubId); - - @Deprecated - List<PublishmentType> listPublishmentType(); - - @Deprecated - OpResult addPublishmentType(PublishmentType publishmentType); - - @Deprecated - OpResult removePublishmentType(String pubType); - - List<AlertPublishEvent> listAlertPublishEvent(int size); - - AlertPublishEvent getAlertPublishEvent(String alertId); - - List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size); - - OpResult addAlertPublishEvent(AlertPublishEvent event); - - ScheduleState getScheduleState(String versionId); - - ScheduleState getScheduleState(); - - List<ScheduleState> listScheduleStates(); - - OpResult addScheduleState(ScheduleState state); - - OpResult clearScheduleState(int maxCapacity); - - List<PolicyAssignment> listAssignments(); - - OpResult addAssignment(PolicyAssignment assignment); - - // APIs for test friendly - OpResult clear(); - - Models export(); - - OpResult importModels(Models models); - - // ----------------------------------------------------------- - // Extended Metadata DAO Methods with default implementation - // ----------------------------------------------------------- - - Logger LOG = LoggerFactory.getLogger(IMetadataDao.class); - - default PolicyDefinition getPolicyById(String policyId) { - Preconditions.checkNotNull(policyId,"policyId"); - return listPolicies().stream().filter(pc -> pc.getName().equals(policyId)).findAny().orElseGet(() -> { - LOG.error("Policy (policyId " + policyId + ") not found"); - throw new IllegalArgumentException("Policy (policyId " + policyId + ") not found"); - }); - } - - default List<Publishment> getPublishmentsByPolicyId(String policyId) { - return listPublishment().stream().filter(ps -> - ps.getPolicyIds() != null && ps.getPolicyIds().contains(policyId) - ).collect(Collectors.toList()); - } - - default OpResult addPublishmentsToPolicy(String policyId, List<String> publishmentIds) { - OpResult result = new OpResult(); - if (publishmentIds == null || publishmentIds.size() == 0) { - result.code = OpResult.FAILURE; - result.message = "Failed to add policy, there is no publisher in it"; - return result; - } - try { - Map<String,Publishment> publishmentMap = new HashMap<>(); - listPublishment().forEach((pub) -> publishmentMap.put(pub.getName(),pub)); - for (String publishmentId : publishmentIds) { - if (publishmentMap.containsKey(publishmentId)) { - Publishment publishment = publishmentMap.get(publishmentId); - if (publishment.getPolicyIds() == null) { - publishment.setPolicyIds(new ArrayList<>()); - } - if (publishment.getPolicyIds().contains(policyId)) { - LOG.warn("Policy {} was already bound with publisher {}",policyId, publishmentId); - } else { - publishment.getPolicyIds().add(policyId); - } - OpResult opResult = addPublishment(publishment); - if (opResult.code == OpResult.FAILURE) { - LOG.error("Failed to add publisher {} to policy {}: {}", publishmentId, policyId, opResult.message); - return opResult; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug(opResult.message); - } - } - } else { - throw new IllegalArgumentException("Publishment (name: " + publishmentId + ") not found"); - } - } - - //for other publishments, remove policyId from them, work around, we should refactor - for (String publishmentId : publishmentMap.keySet()) { - if (publishmentIds.contains(publishmentId)) { - continue; - } - Publishment publishment = publishmentMap.get(publishmentId); - if (publishment.getPolicyIds() != null && publishment.getPolicyIds().contains(policyId)) { - publishment.getPolicyIds().remove(policyId); - OpResult opResult = addPublishment(publishment); - if (opResult.code == OpResult.FAILURE) { - LOG.error("Failed to delete policy {}, from publisher {}, {} ", policyId, publishmentId, opResult.message); - return opResult; - } - } - } - result.code = OpResult.SUCCESS; - result.message = "Successfully add " + publishmentIds.size() + " publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + policyId; - LOG.info(result.message); - } catch (Exception ex) { - result.code = OpResult.FAILURE; - result.message = "Failed to add publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + policyId + ", cause: " + ex.getMessage(); - LOG.error(result.message,ex); - } - return result; - } - - default List<PolicyDefinition> getPoliciesBySiteId(String siteId) { - Preconditions.checkNotNull(siteId,"siteId"); - return listPolicies().stream().filter(pc -> pc.getSiteId().equals(siteId)).collect(Collectors.toList()); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java deleted file mode 100644 index 30868b5..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.metadata; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.typesafe.config.Config; - -public class MetadataUtils { - - private static final Logger LOG = LoggerFactory.getLogger(MetadataUtils.class); - public static final String META_DATA = "metadata"; - public static final String ALERT_META_DATA_DAO = "metadataDao"; - public static final String JDBC_USERNAME_PATH = "jdbc.username"; - public static final String JDBC_PASSWORD_PATH = "jdbc.password"; - public static final String JDBC_DRIVER_PATH = "jdbc.driverClassName"; - public static final String JDBC_DATABASE_PATH = "jdbc.database"; - public static final String JDBC_CONNECTION_PATH = "jdbc.connection"; - public static final String JDBC_CONNECTION_PROPERTIES_PATH = "jdbc.connectionProperties"; - public static final String MONGO_CONNECTION_PATH = "mongo.connection"; - public static final String MONGO_DATABASE = "mongo.database"; - - public static <T> String getKey(T t) { - if (t instanceof StreamDefinition) { - return ((StreamDefinition) t).getStreamId(); - } - if (t instanceof PolicyAssignment) { - return ((PolicyAssignment) t).getPolicyName(); - } - if (t instanceof ScheduleState) { - return ((ScheduleState) t).getVersion(); - } - if (t instanceof AlertPublishEvent) { - return ((AlertPublishEvent) t).getAlertId(); - } - - try { - Method m = t.getClass().getMethod("getName"); - return (String) m.invoke(t); - } catch (NoSuchMethodException | SecurityException | InvocationTargetException | IllegalAccessException - | IllegalArgumentException e) { - LOG.error(" getName not found on given class :" + t.getClass().getName()); - } - throw new RuntimeException(String.format("no getName() found on target class %s for matching", t.getClass() - .getName())); - } - - public static Connection getJdbcConnection(Config config) { - - Connection connection = null; - try { - if (config.hasPath(JDBC_USERNAME_PATH)) { - connection = DriverManager.getConnection( - config.getString(JDBC_CONNECTION_PATH), - config.getString(JDBC_USERNAME_PATH), - config.getString(JDBC_PASSWORD_PATH)); - } else { - connection = DriverManager.getConnection(config.getString(JDBC_CONNECTION_PATH)); - } - } catch (SQLException e) { - LOG.error(e.getMessage(), e); - } - return connection; - } -}
