http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java deleted file mode 100644 index e399189..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.service.alert; - -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.alert.entity.SiteDescServiceEntity; -import org.apache.eagle.service.generic.GenericEntityServiceResource; -import org.apache.eagle.alert.entity.ApplicationDescServiceEntity; -import org.apache.eagle.alert.entity.SiteApplicationServiceEntity; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.type.TypeFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.*; -import javax.ws.rs.core.MediaType; -import java.io.IOException; -import java.io.InputStream; -import java.util.*; - -@Path(SiteApplicationResource.ROOT_PATH) -public class SiteApplicationResource { - private final static Logger LOG = LoggerFactory.getLogger(SiteApplicationResource.class); - private final static GenericEntityServiceResource resource = new GenericEntityServiceResource(); - public final static String ROOT_PATH = "/module"; - - @Path("site") - @DELETE - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - public GenericServiceAPIResponseEntity deleteSite(@QueryParam("site") String site) { - String siteQuery = Constants.SITE_DESCRIPTION_SERVICE_ENDPOINT_NAME+ "[@site=\"" + site + "\"]{*}"; - String siteApplicationQuery = Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME + "[@site=\"" + site + "\"]{*}"; - int pageSize = Integer.MAX_VALUE; - - GenericServiceAPIResponseEntity response = resource.deleteByQuery(siteQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false); - if(response.isSuccess()) { - response = resource.deleteByQuery(siteApplicationQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false); - if(!response.isSuccess()) { - LOG.error(response.getException()); - } - } else { - LOG.error(response.getException()); - } - return response; - } - - @Path("application") - @DELETE - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - public GenericServiceAPIResponseEntity deleteApplication(@QueryParam("application") String application) { - String applicationQuery = Constants.APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME+ "[@application=\"" + application + "\"]{*}"; - String siteApplicationQuery = Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME + "[@application=\"" + application + "\"]{*}"; - int pageSize = Integer.MAX_VALUE; - - GenericServiceAPIResponseEntity response = resource.deleteByQuery(applicationQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false); - if(response.isSuccess()) { - response = resource.deleteByQuery(siteApplicationQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false); - if(!response.isSuccess()) { - LOG.error(response.getException()); - } - } else { - LOG.error(response.getException()); - } - return response; - } - - @Path("feature") - @DELETE - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - public GenericServiceAPIResponseEntity deleteFeature(@QueryParam("feature") String feature) { - String featureQuery = Constants.FEATURE_DESCRIPTION_SERVICE_ENDPOINT_NAME+ "[@feature=\"" + feature + "\"]{*}"; - String applicationQuery = Constants.APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME + "[]{*}"; - int pageSize = Integer.MAX_VALUE; - - GenericServiceAPIResponseEntity response = resource.deleteByQuery(featureQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false); - if(response.isSuccess()) { - response = resource.search(applicationQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false); - if(response.isSuccess()) { - List<ApplicationDescServiceEntity> entityList = response.getObj(); - Boolean isModified = false; - for(ApplicationDescServiceEntity entity : entityList) { - if(entity.getFeatures().contains(feature)) { - List<String> features = entity.getFeatures(); - features.remove(feature); - entity.setFeatures(features); - isModified = true; - } - } - if(isModified) { - response = resource.updateEntities(entityList, Constants.APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME); - } - } - } - if(!response.isSuccess()) { - LOG.error(response.getException()); - } - return response; - } - - @Path("siteApplication") - @POST - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - public GenericServiceAPIResponseEntity createSiteApplications(InputStream inputStream) { - GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity<>(); - int pageSize = Integer.MAX_VALUE; - try { - List<SiteApplicationObject> entities = (List<SiteApplicationObject>) unmarshalSiteApplicationEntities(inputStream); - if(entities == null) { - throw new IllegalArgumentException("cannot convert to SiteApplicationObject"); - } - List<SiteDescServiceEntity> siteEntities = new LinkedList<>(); - List<SiteApplicationServiceEntity> applicationEntities = new LinkedList<>(); - Set<String> sites = new HashSet<>(); - for(SiteApplicationObject e : entities) { - sites.add(e.getTags().get("site")); - SiteDescServiceEntity entity = new SiteDescServiceEntity(); - entity.setEnabled(e.getEnabled()); - entity.setTags(e.getTags()); - siteEntities.add(entity); - applicationEntities.addAll(e.getApplications()); - } - response = resource.updateEntities(siteEntities, Constants.SITE_DESCRIPTION_SERVICE_ENDPOINT_NAME); - if(response.isSuccess()) { - String query = buildQueryWithAttributeList(Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME, "site", sites); - LOG.info("query=" + query); - response = resource.search(query, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false); - if(response.isSuccess()) { - List<SiteApplicationServiceEntity> applications = response.getObj(); - for(SiteApplicationServiceEntity app : applications) { - app.setEnabled(false); - } - response = resource.updateEntities(applications, Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME); - if(response.isSuccess()) { - response = resource.updateEntities(applicationEntities, Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME); - } - } - } - if(!response.isSuccess()) { - LOG.error(response.getException()); - } - } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - response.setException(ex); - } - return response; - } - - private String buildQueryWithAttributeList(String serviceName, String attr, Set<String> sets) { - StringBuilder builder = new StringBuilder(serviceName + "["); - String attribute = "@" + attr + "="; - String condition = " OR "; - for(String s : sets) { - String value = String.format("\"%s\"", s); - builder.append(attribute + value); - builder.append(condition); - } - String result = builder.substring(0, builder.length()-condition.length()); - result = result + "]{*}"; - return result; - } - - private List<? extends TaggedLogAPIEntity> unmarshalSiteApplicationEntities(InputStream inputStream) throws IllegalAccessException, InstantiationException, IOException { - ObjectMapper objectMapper = new ObjectMapper(); - return objectMapper.readValue(inputStream, TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, SiteApplicationObject.class)); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java deleted file mode 100644 index 0ec9cf4..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.service.alert.resolver; - -import java.util.List; - -public class AttributeResolveResponse<V> { - private String exception; - private List<V> values; - - public List<V> getValues() { - return values; - } - - public void setValues(List<V> values) { - this.values = values; - } - - public String getException() { - return exception; - } - - public void setException(String exception) { - this.exception = exception; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider deleted file mode 100644 index 3833f47..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.eagle.service.alert.SiddhiAlertPolicyValidateProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/test/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/test/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider b/eagle-core/eagle-alert-parent/eagle-alert-service/src/test/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider deleted file mode 100644 index 3833f47..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/test/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.eagle.service.alert.SiddhiAlertPolicyValidateProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/pom.xml b/eagle-core/eagle-alert-parent/pom.xml index bc35fc9..9dbacf0 100644 --- a/eagle-core/eagle-alert-parent/pom.xml +++ b/eagle-core/eagle-alert-parent/pom.xml @@ -31,9 +31,6 @@ <name>eagle-alert-parent</name> <modules> <module>eagle-alert</module> - <module>eagle-alert-base</module> - <module>eagle-alert-process</module> <module>eagle-alert-service</module> - <module>eagle-alert-notification-plugin</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml b/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml deleted file mode 100644 index 919188f..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml +++ /dev/null @@ -1,142 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - ~ - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>eagle-app-parent</artifactId> - <groupId>org.apache.eagle</groupId> - <version>0.5.0-incubating-SNAPSHOT</version> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>eagle-stream-application-manager</artifactId> - - <dependencies> - <dependency> - <groupId>org.apache.eagle</groupId> - <artifactId>eagle-application-service</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.eagle</groupId> - <artifactId>eagle-stream-pipeline</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-core</artifactId> - <version>${storm.version}</version> - <exclusions> - <exclusion> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>log4j-over-slf4j</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - <version>${scala.version}.0</version> - </dependency> - <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.version}</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.typesafe.akka</groupId> - <artifactId>akka-actor_${scala.version}</artifactId> - <version>${akka.actor.version}</version> - </dependency> - <dependency> - <groupId>com.typesafe.akka</groupId> - <artifactId>akka-testkit_${scala.version}</artifactId> - <version>${akka.actor.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <scope>compile</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <executions> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>add-source</goal> - <goal>compile</goal> - </goals> - </execution> - <execution> - <id>scala-test-compile</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <skipTests>true</skipTests> - </configuration> - </plugin> - <plugin> - <groupId>org.scalatest</groupId> - <artifactId>scalatest-maven-plugin</artifactId> - <configuration> - <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> - <junitxml>.</junitxml> - <filereports>TestSuite.txt</filereports> - </configuration> - <executions> - <execution> - <id>test</id> - <goals> - <goal>test</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java deleted file mode 100644 index d382629..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application; - - -public class TopologyException extends Exception { - public TopologyException(String s, Exception e) { super(s,e); } - public TopologyException(Exception e) { super(e); } - public TopologyException(String s) { super(s); } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java deleted file mode 100644 index 8f625c7..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application; - - -import com.typesafe.config.Config; - - -public interface TopologyExecutable { - void submit(String topology, Config config); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java deleted file mode 100644 index e32f48e..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application; - - -import com.typesafe.config.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - - -public final class TopologyFactory { - public static Logger LOG = LoggerFactory.getLogger(TopologyFactory.class); - private final static Map<String, TopologyExecutable> topologyCache = Collections.synchronizedMap(new HashMap<String, TopologyExecutable>()); - public static TopologyExecutable getTopologyInstance(String topologyClass) throws TopologyException { - TopologyExecutable instance; - if(topologyCache.containsKey(topologyClass)){ - instance = topologyCache.get(topologyClass); - } else { - try { - LOG.info("load class " + topologyClass + "with classLoader " + TopologyFactory.class.getClassLoader().toString()); - instance = (TopologyExecutable) Class.forName(topologyClass).newInstance(); - topologyCache.put(topologyClass, instance); - } catch (ClassNotFoundException e) { - throw new TopologyException("Topology in type of " + topologyClass + " is not found",e); - } catch (InstantiationException | IllegalAccessException e) { - throw new TopologyException(e); - } - } - return instance; - } - - public static void submit(String topologyClass, Config config) throws TopologyException { - TopologyExecutable topology = getTopologyInstance(topologyClass); - topology.submit(topologyClass, config); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala index 3e918cc..b5fbf59 100644 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala +++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala @@ -20,13 +20,11 @@ package org.apache.eagle.stream.application import com.typesafe.config.Config import org.apache.eagle.datastream.core.StreamContext -import org.apache.eagle.stream.pipeline.Pipeline trait AbstractDynamicApplication extends TopologyExecutable { def compileStream(application: String, config: Config): StreamContext = { - val pipeline = Pipeline.parseStringWithConfig(application, config) - Pipeline.compile(pipeline) + null } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala deleted file mode 100644 index bbfaedd..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application - -import java.util - -import com.google.common.base.Preconditions -import org.apache.eagle.service.application.entity.TopologyExecutionStatus -import org.apache.eagle.stream.application.impl.StormExecutionPlatform -import org.slf4j.{Logger, LoggerFactory} - -import scala.collection.JavaConversions - - -object ApplicationManager { - private val LOG: Logger = LoggerFactory.getLogger(ApplicationManager.getClass) - private val workerMap: util.Map[AnyRef, TaskExecutor] = new util.TreeMap[AnyRef, TaskExecutor] - - def getWorkerMap: util.Map[AnyRef, TaskExecutor] = { - return workerMap - } - - def submit(id: AnyRef, runnable: Runnable): TaskExecutor = { - if (workerMap.containsKey(id)) { - val executor: Thread = workerMap.get(id) - if (!executor.isAlive || executor.getState.equals() ) { - LOG.info("Replacing dead executor: {}", executor) - workerMap.remove(id) - } - else { - throw new IllegalArgumentException("Duplicated id '" + id + "'") - } - } - val worker: TaskExecutor = new TaskExecutor(runnable) - LOG.info("Registering new executor %s: %s".format(id, worker)) - workerMap.put(id, worker) - worker.setName(id.toString) - worker.setDaemon(true) - worker.start - return worker - } - - def get(id: AnyRef): TaskExecutor = { - Preconditions.checkArgument(workerMap.containsKey(id)) - return workerMap.get(id) - } - - @throws(classOf[Exception]) - def stop(id: AnyRef): TaskExecutor = { - val worker: TaskExecutor = get(id) - worker.interrupt - //this.workerMap.remove(id) - return worker - } - - def getWorkerStatus(state: Thread.State): String = { - if (whereIn(state, java.lang.Thread.State.RUNNABLE, java.lang.Thread.State.TIMED_WAITING, java.lang.Thread.State.WAITING)) { - return TopologyExecutionStatus.STARTED - } - else if (whereIn(state, java.lang.Thread.State.NEW)) { - return TopologyExecutionStatus.STARTING - } - else if (whereIn(state, java.lang.Thread.State.TERMINATED)) { - return TopologyExecutionStatus.STOPPED - } - throw new IllegalStateException("Unknown state: " + state) - } - - def getTopologyStatus(status: String): String = { - if(whereIn(status, StormExecutionPlatform.KILLED)) - return TopologyExecutionStatus.STOPPING - return TopologyExecutionStatus.STARTED - } - - private def whereIn(status: String, inStatuses: String*): Boolean = { - for (_status <- inStatuses) { - if (_status.equalsIgnoreCase(status)) { - return true - } - } - return false - } - private def whereIn(state: Thread.State, inStates: Thread.State*): Boolean = { - for (_state <- inStates) { - if (_state eq state) { - return true - } - } - return false - } - - def remove(id: AnyRef) { - val executor: TaskExecutor = this.get(id) - if (executor.isAlive) { - throw new RuntimeException("Failed to remove alive executor '" + id + "'") - } - else { - this.workerMap.remove(id) - } - } - - def stopAll(): Unit ={ - JavaConversions.collectionAsScalaIterable(workerMap.values()) foreach { worker => - if(!worker.isInterrupted) { - worker.interrupt() - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala deleted file mode 100644 index 4c2df77..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application - -import com.typesafe.config.Config -import org.apache.eagle.service.application.AppManagerConstants -import org.apache.eagle.service.application.entity.TopologyExecutionEntity - - -object ApplicationManagerUtils { - - def generateTopologyFullName(topologyExecution: TopologyExecutionEntity) = { - val fullName = "eagle-%s-%s-%s".format(topologyExecution.getSite, topologyExecution.getApplication, topologyExecution.getTopology) - fullName - } - - def buildStormTopologyURL(config: Config, topologyID: String): String = { - val clusterURL = if(config.hasPath(AppManagerConstants.CLUSTER_URL)) config.getString(AppManagerConstants.CLUSTER_URL) else AppManagerConstants.DEFAULT_CLUSTER_URL - val topologyURL = clusterURL + "/topology.html?id=" + topologyID - topologyURL - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala deleted file mode 100644 index ae0f6e8..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application - -import java.util -import java.util.concurrent.Callable - -import akka.dispatch.Futures -import com.typesafe.config.Config -import org.apache.eagle.alert.entity.SiteApplicationServiceEntity -import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity -import org.apache.eagle.policy.common.Constants -import org.apache.eagle.service.application.entity.{TopologyDescriptionEntity, TopologyExecutionEntity, TopologyExecutionStatus, TopologyOperationEntity} -import org.apache.eagle.service.client.EagleServiceConnector -import org.apache.eagle.service.client.impl.EagleServiceClientImpl -import org.slf4j.{Logger, LoggerFactory} - -import scala.collection.JavaConversions -import scala.concurrent.ExecutionContext - - -class ApplicationSchedulerAsyncDAO(config: Config, ex: ExecutionContext) { - private val LOG: Logger = LoggerFactory.getLogger(classOf[ApplicationSchedulerAsyncDAO]) - private val connector: EagleServiceConnector = new EagleServiceConnector(config) - - def getEagleServiceClient(): EagleServiceClientImpl = { - return new EagleServiceClientImpl(connector) - } - - def readOperationsByStatus(status: String) = { - Futures.future(new Callable[util.List[TopologyOperationEntity]]{ - override def call(): util.List[TopologyOperationEntity] = { - val client = getEagleServiceClient() - val query = "%s[@status=\"%s\"]{*}".format(Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME, status) - val response: GenericServiceAPIResponseEntity[TopologyOperationEntity] = client.search(query).pageSize(Int.MaxValue).send() - if(client != null) client.close() - if(!response.isSuccess || response.getObj == null) - throw new Exception(s"Fail to load operations with status $status") - response.getObj - } - }, ex) - } - - def loadAllTopologyExecutionEntities() = { - Futures.future(new Callable[util.List[TopologyExecutionEntity]]{ - override def call(): util.List[TopologyExecutionEntity] = { - val client = getEagleServiceClient() - val query = "%s[@status != \"%s\"]{*}".format(Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME, TopologyExecutionStatus.NEW) - val response: GenericServiceAPIResponseEntity[TopologyExecutionEntity] = client.search(query).pageSize(Int.MaxValue).send() - if(client != null) client.close() - if(!response.isSuccess || response.getObj == null) throw new Exception(response.getException) - response.getObj - } - }, ex) - } - - def loadTopologyExecutionByName(site: String, appName: String, topologyName: String) = { - Futures.future(new Callable[TopologyExecutionEntity]{ - override def call(): TopologyExecutionEntity = { - val client = getEagleServiceClient() - val query = "%s[@site=\"%s\" AND @application=\"%s\" AND @topology=\"%s\"]{*}".format(Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME, site, appName, topologyName) - LOG.info(s"query=$query") - val response: GenericServiceAPIResponseEntity[TopologyExecutionEntity] = client.search(query).pageSize(Int.MaxValue).send() - if(client != null) client.close() - if(!response.isSuccess || response.getObj == null) - throw new Exception(s"Fail to load topologyExecutionEntity with application=$appName topology=$topologyName due to Exception: ${response.getException}") - if(response.getObj.size() == 0 || response.getObj.size() > 1) { - throw new Exception(s"Get 0 or more than 1 topologyExecutionEntity with application=$appName topology=$topologyName") - } - response.getObj.get(0) - } - }, ex) - } - - def loadTopologyDescriptionByName(site: String, application: String, topologyName: String) = { - Futures.future(new Callable[TopologyDescriptionEntity]{ - override def call(): TopologyDescriptionEntity = { - val client = getEagleServiceClient() - var query = "%s[@topology=\"%s\"]{*}".format(Constants.TOPOLOGY_DESCRIPTION_SERVICE_ENDPOINT_NAME, topologyName) - val response: GenericServiceAPIResponseEntity[TopologyDescriptionEntity] = client.search(query).pageSize(Int.MaxValue).send() - if(!response.isSuccess || response.getObj == null || response.getObj.size() == 0) - throw new Exception(s"Fail to load TopologyDescriptionEntity with site=$site application=$application topology=$topologyName due to Exception: ${response.getException}") - val topologyDescriptionEntity = response.getObj.get(0) - - query = "%s[@site=\"%s\" AND @application=\"%s\"]{*}".format(Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME, site, application) - val configResponse: GenericServiceAPIResponseEntity[SiteApplicationServiceEntity] = client.search(query).pageSize(Int.MaxValue).send() - if (client != null) client.close() - if(!configResponse.isSuccess || configResponse.getObj == null || configResponse.getObj.size() == 0) - throw new Exception(s"Fail to load topology configuration with query=$query due to Exception: ${configResponse.getException}") - val siteApplicationEntity = configResponse.getObj.get(0) - topologyDescriptionEntity.setContext(siteApplicationEntity.getConfig) - topologyDescriptionEntity - } - }, ex) - } - - def updateOperationStatus(operation: TopologyOperationEntity) = { - Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{ - override def call(): GenericServiceAPIResponseEntity[String] = { - if(LOG.isDebugEnabled()) LOG.debug(s"Updating status of command[$operation] as ${operation.getStatus}") - val client = getEagleServiceClient() - operation.setLastModifiedDate(System.currentTimeMillis()) - val response= client.update(java.util.Arrays.asList(operation), classOf[TopologyOperationEntity]) - if(client != null) client.close() - if(response.isSuccess) { - LOG.info(s"Updated operation status [$operation] as: ${operation.getStatus}") - } else { - LOG.error(s"Failed to update status as ${operation.getStatus} of command[$operation]") - throw new RuntimeException(s"Failed to update command due to exception: ${response.getException}") - } - response - } - }, ex) - } - - def updateTopologyExecutionStatus(topology: TopologyExecutionEntity) = { - Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{ - override def call(): GenericServiceAPIResponseEntity[String] = { - if(LOG.isDebugEnabled()) LOG.debug(s"Updating status of app[$topology] as ${topology.getStatus}") - val client = getEagleServiceClient() - topology.setLastModifiedDate(System.currentTimeMillis()) - if(client != null) client.close() - val response= client.update(java.util.Arrays.asList(topology), classOf[TopologyExecutionEntity]) - if(response.isSuccess) { - LOG.info(s"Updated status application[$topology] as: ${topology.getStatus}") - } else { - LOG.error(s"Failed to update status as ${topology.getStatus} of application[$topology] due to ${response.getException}") - } - response - } - }, ex) - } - - def clearPendingOperations() = { - Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{ - override def call(): GenericServiceAPIResponseEntity[String] = { - LOG.info("start to clear operation") - val query: String = "%s[@status=\"%s\"]{*}".format(Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME, TopologyOperationEntity.OPERATION_STATUS.PENDING) - val client = getEagleServiceClient() - val response: GenericServiceAPIResponseEntity[TopologyOperationEntity] = client.search(query).pageSize(Int.MaxValue).send() - var ret: GenericServiceAPIResponseEntity[String] = new GenericServiceAPIResponseEntity[String]() - if (response.isSuccess && response.getObj.size != 0) { - val pendingOperations: util.List[TopologyOperationEntity] = response.getObj - val failedOperations: util.List[TopologyOperationEntity] = new util.ArrayList[TopologyOperationEntity] - JavaConversions.collectionAsScalaIterable(pendingOperations) foreach { operation => - operation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED) - failedOperations.add(operation) - } - ret = client.update(failedOperations, Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME) - if (client != null) client.close() - if (ret.isSuccess) { - LOG.info(s"Successfully clear ${failedOperations.size()} pending operations") - } else { - LOG.error(s"Failed to clear pending operations due to exception:" + ret.getException) - } - } - ret - } - }, ex) - } -} - - http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala deleted file mode 100644 index 88271bb..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application - -import com.typesafe.config.Config -import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyDescriptionEntity} - - -trait ExecutionPlatform { - def start(topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config) - def stop(topologyExecution: TopologyExecutionEntity, config: Config) - def status(topologyExecutions: java.util.List[TopologyExecutionEntity], config: Config) - def status(topologyExecution: TopologyExecutionEntity, config: Config) -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala deleted file mode 100644 index 6b9c033..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application - -import org.apache.eagle.service.application.AppManagerConstants -import org.apache.eagle.stream.application.impl.StormExecutionPlatform -import org.slf4j.{LoggerFactory, Logger} - -import scala.collection.mutable - - -object ExecutionPlatformFactory { - private val LOG: Logger = LoggerFactory.getLogger(ExecutionPlatformFactory.getClass) - - var managerCache = new mutable.HashMap[String, ExecutionPlatform] with - mutable.SynchronizedMap[String, ExecutionPlatform] - - def getApplicationManager(managerType: String): ExecutionPlatform = { - if(managerCache.contains(managerType)) { - managerCache.get(managerType).get - } else { - managerType match { - case AppManagerConstants.EAGLE_CLUSTER_STORM => - val instance = new StormExecutionPlatform - managerCache.put(managerType, instance) - instance - case _ => - throw new Exception(s"Invalid managerType $managerType") - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala deleted file mode 100644 index 07737ac..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application - -import org.codehaus.jackson.annotate.JsonIgnore - -class TaskExecutor(runnable: Runnable) extends Thread(runnable) { - - @JsonIgnore override def getContextClassLoader: ClassLoader = { - return super.getContextClassLoader - } - - @JsonIgnore override def getUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = { - return super.getUncaughtExceptionHandler - } - - def shutdown { - this.interrupt - } - - def restart { - this.interrupt - this.start - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala deleted file mode 100644 index 7d52649..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application.impl - -import com.typesafe.config.Config -import org.apache.eagle.datastream.ExecutionEnvironments -import org.apache.eagle.datastream.storm.StormExecutionEnvironment -import org.apache.eagle.stream.application.AbstractDynamicApplication -import org.slf4j.LoggerFactory - - -object StormDynamicTopology extends AbstractDynamicApplication { - val LOG = LoggerFactory.getLogger(classOf[AbstractDynamicApplication]) - - override def submit(application: String, config: Config) { - val stream = compileStream(application, config) - var ret = true - - try { - val stormEnv = ExecutionEnvironments.getWithConfig[StormExecutionEnvironment](stream.getConfig) - stream.submit(stormEnv) - } catch { - case e: Throwable => - ret = false - LOG.error(e.toString) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala deleted file mode 100644 index af4cafa..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application.impl - -import java.net.URLDecoder -import java.nio.file.{Files, Paths} - -import backtype.storm.generated.InvalidTopologyException -import backtype.storm.utils.{NimbusClient, Utils} -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.eagle.common.config.EagleConfigConstants -import org.apache.eagle.service.application.AppManagerConstants -import org.apache.eagle.service.application.entity.{TopologyDescriptionEntity, TopologyExecutionEntity, TopologyExecutionStatus} -import org.apache.eagle.stream.application.{ApplicationManager, ApplicationManagerUtils, ExecutionPlatform, TopologyFactory} -import org.slf4j.LoggerFactory - -import scala.collection.JavaConversions - -object StormExecutionPlatform { - val ACTIVE: String = "ACTIVE" - val INACTIVE: String = "INACTIVE" - val KILLED: String = "KILLED" - val REBALANCING: String = "REBALANCING" -} - -class StormExecutionPlatform extends ExecutionPlatform { - val LOG = LoggerFactory.getLogger(classOf[StormExecutionPlatform]) - - private def getNimbusClient(appConfig: com.typesafe.config.Config): NimbusClient = { - val conf = Utils.readStormConfig().asInstanceOf[java.util.HashMap[String, Object]] - conf.putAll(Utils.readCommandLineOpts().asInstanceOf[java.util.HashMap[String, Object]]) - - if(appConfig.hasPath("envContextConfig.nimbusHost")) { - LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_HOST} as ${appConfig.getString("envContextConfig.nimbusHost")}") - conf.put(backtype.storm.Config.NIMBUS_HOST, appConfig.getString("envContextConfig.nimbusHost")) - } - - if(appConfig.hasPath("envContextConfig.nimbusThriftPort")) { - LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_THRIFT_PORT} as ${appConfig.getString("envContextConfig.nimbusThriftPort")}") - conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, appConfig.getNumber("envContextConfig.nimbusThriftPort")) - } - NimbusClient.getConfiguredClient(conf) - } - - def startLocal(topologyName: String, topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config): Unit = { - val worker: Thread = ApplicationManager.submit(topologyName, new Runnable { - override def run(): Unit = { - try { - val topologyType = topology.getType.toUpperCase() - topologyType match { - case TopologyDescriptionEntity.TYPE.CLASS => - TopologyFactory.submit(topology.getExeClass, config) - case TopologyDescriptionEntity.TYPE.DYNAMIC => - StormDynamicTopology.submit(topology.getExeClass, config) - case m@_ => - LOG.error("Unsupported topology type: " + topology.getType) - } - } catch { - case ex: Throwable => - LOG.error(s"topology $topologyName in local mode is interrupted with ${ex.toString}") - } - } - }) - topologyExecution.setFullName(topologyName) - topologyExecution.setStatus(ApplicationManager.getWorkerStatus(worker.getState)) - topologyExecution.setDescription("Running inside " + worker.toString + " in local mode") - } - - override def start(topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config): Unit = { - val stormJarPath: String = URLDecoder.decode(classOf[ExecutionPlatform].getProtectionDomain.getCodeSource.getLocation.getPath, "UTF-8") - if (stormJarPath == null || !Files.exists(Paths.get(stormJarPath)) || !stormJarPath.endsWith(".jar")) { - val errMsg = s"storm jar file $stormJarPath does not exists, or is a invalid jar file" - LOG.error(errMsg) - throw new Exception(errMsg) - } - LOG.info(s"Detected a storm.jar location at: $stormJarPath") - System.setProperty("storm.jar", stormJarPath) - - val fullName = ApplicationManagerUtils.generateTopologyFullName(topologyExecution) - val extConfigStr = "envContextConfig.topologyName=%s".format(fullName) - val extConfig = ConfigFactory.parseString(extConfigStr) - val newConfig = extConfig.withFallback(config) - - val mode = if(config.hasPath(AppManagerConstants.RUNNING_MODE)) config.getString(AppManagerConstants.RUNNING_MODE) else EagleConfigConstants.LOCAL_MODE - topologyExecution.setMode(mode) - if (topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) { - startLocal(fullName, topology, topologyExecution, newConfig) - return - } - - val topologyType = topology.getType.toUpperCase() - topologyType match { - case TopologyDescriptionEntity.TYPE.CLASS => - TopologyFactory.submit(topology.getExeClass, newConfig) - case TopologyDescriptionEntity.TYPE.DYNAMIC => - StormDynamicTopology.submit(topology.getExeClass, newConfig) - case m@_ => - throw new InvalidTopologyException("Unsupported topology type: " + topology.getType) - } - topologyExecution.setFullName(fullName) - //topologyExecution.setStatus(TopologyExecutionStatus.STARTED) - } - - override def stop(topologyExecution: TopologyExecutionEntity, config: Config): Unit = { - val name: String = ApplicationManagerUtils.generateTopologyFullName(topologyExecution) - - if(topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) { - stopLocal(name, topologyExecution) - } else { - getNimbusClient(config).getClient.killTopology(name) - topologyExecution.setStatus(TopologyExecutionStatus.STOPPING) - //topologyExecution.setDescription("") - } - } - - def stopLocal(name: String, topologyExecution: TopologyExecutionEntity): Unit = { - val taskWorker = ApplicationManager.stop(name) - topologyExecution.setStatus(ApplicationManager.getWorkerStatus(taskWorker.getState)) - topologyExecution.setDescription(s"topology status is ${taskWorker.getState}") - /*try{ - ApplicationManager.remove(name) - } catch { - case ex: IllegalArgumentException => - LOG.warn(s"ApplicationManager.remove($name) failed as it has been removed") - }*/ - } - - - def getTopology(topologyName: String, config: Config) = { - val topologySummery = getNimbusClient(config).getClient.getClusterInfo.get_topologies - JavaConversions.collectionAsScalaIterable(topologySummery).find { t => t.get_name.equals(topologyName) } - match { - case Some(t) => Some(t) - case None => None - } - } - - override def status(topologyExecution: TopologyExecutionEntity, config: Config): Unit = { - val name: String = ApplicationManagerUtils.generateTopologyFullName(topologyExecution) - - if(topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) { - statusLocal(name, topologyExecution) - } else { - val topology = getTopology(name, config) - topology match { - case Some(topology) => - topologyExecution.setStatus(ApplicationManager.getTopologyStatus(topology.get_status())) - topologyExecution.setUrl(ApplicationManagerUtils.buildStormTopologyURL(config, topology.get_id())) - topologyExecution.setDescription(topology.toString) - case None => - topologyExecution.setStatus(TopologyExecutionStatus.STOPPED) - topologyExecution.setUrl("") - topologyExecution.setDescription("") - } - } - } - - def statusLocal(name: String, topologyExecution: TopologyExecutionEntity): Unit = { - try { - val currentStatus = topologyExecution.getStatus() - val newStatus = ApplicationManager.getWorkerStatus(ApplicationManager.get(name).getState()) - if (!currentStatus.equals(newStatus)) { - LOG.info("Status of topology: %s changed from %s to %s".format(topologyExecution.getFullName, currentStatus, newStatus)) - topologyExecution.setStatus(newStatus) - topologyExecution.setDescription(String.format("Status of topology: %s changed from %s to %s", name, currentStatus, newStatus)) - } else if(currentStatus.equalsIgnoreCase(TopologyExecutionStatus.STOPPED)) { - ApplicationManager.remove(name) - } - }catch { - case ex: Throwable => - topologyExecution.setDescription("") - topologyExecution.setStatus(TopologyExecutionStatus.STOPPED) - } - } - - override def status(topologyExecutions: java.util.List[TopologyExecutionEntity], config: Config): Unit = { - JavaConversions.collectionAsScalaIterable(topologyExecutions) foreach { - topologyExecution => status(topologyExecution, config) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala deleted file mode 100644 index 8fbf60d..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application.scheduler - -import java.util.concurrent.Callable - -import akka.actor.{Actor, ActorLogging} -import akka.dispatch.Futures -import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigSyntax} -import org.apache.eagle.common.config.EagleConfigConstants -import org.apache.eagle.service.application.AppManagerConstants -import org.apache.eagle.service.application.entity.TopologyOperationEntity.OPERATION -import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyExecutionStatus, TopologyOperationEntity} -import org.apache.eagle.stream.application.{ApplicationSchedulerAsyncDAO, ExecutionPlatformFactory} - -import scala.collection.JavaConversions -import scala.util.{Failure, Success} - - -private[scheduler] class AppCommandExecutor extends Actor with ActorLogging { - @volatile var _config: Config = _ - @volatile var _dao: ApplicationSchedulerAsyncDAO = _ - - import context.dispatcher - - def start(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) = { - val options: ConfigParseOptions = ConfigParseOptions.defaults.setSyntax(ConfigSyntax.PROPERTIES).setAllowMissing(false) - _dao.loadTopologyDescriptionByName(topologyOperation.getSite, topologyOperation.getApplication, topologyOperation.getTopology) onComplete { - case Success(topology) => - val topologyConfig: Config = ConfigFactory.parseString(topology.getContext, options) - - if(!topologyConfig.hasPath(EagleConfigConstants.APP_CONFIG)) { - topologyOperation.setMessage("Fail to detect topology configuration") - topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED) - _dao.updateOperationStatus(topologyOperation) - } else { - val config = topologyConfig.getConfig(EagleConfigConstants.APP_CONFIG).withFallback(_config) - val clusterType = if(config.hasPath(AppManagerConstants.CLUSTER_ENV)) config.getString(AppManagerConstants.CLUSTER_ENV) else AppManagerConstants.EAGLE_CLUSTER_STORM - topologyExecution.setEnvironment(clusterType) - - Futures.future(new Callable[TopologyExecutionEntity]{ - override def call(): TopologyExecutionEntity = { - topologyExecution.setStatus(TopologyExecutionStatus.STARTING) - _dao.updateTopologyExecutionStatus(topologyExecution) - ExecutionPlatformFactory.getApplicationManager(clusterType).start(topology, topologyExecution, config) - topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS) - topologyExecution - } - }, context.dispatcher) onComplete { - case Success(topologyExecutionEntity) => - topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS) - updateStatus(topologyExecution, topologyOperation) - case Failure(ex) => - topologyOperation.setMessage(ex.getMessage) - topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED) - _dao.updateOperationStatus(topologyOperation) - } - } - - case Failure(ex) => - topologyOperation.setMessage(ex.getMessage) - topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED) - _dao.updateOperationStatus(topologyOperation) - } - } - - def stop(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) = { - val clusterType = topologyExecution.getEnvironment - - Futures.future(new Callable[TopologyExecutionEntity]{ - override def call(): TopologyExecutionEntity = { - topologyExecution.setStatus(TopologyExecutionStatus.STOPPING) - _dao.updateTopologyExecutionStatus(topologyExecution) - ExecutionPlatformFactory.getApplicationManager(clusterType).stop(topologyExecution, _config) - topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS) - topologyExecution - } - }, context.dispatcher) onComplete { - case Success(topologyExecutionEntity) => - topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS) - updateStatus(topologyExecution, topologyOperation) - case Failure(ex) => - topologyOperation.setMessage(ex.toString) - topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED) - _dao.updateOperationStatus(topologyOperation) - } - } - - def status(topologyExecution: TopologyExecutionEntity) = { - val clusterType = topologyExecution.getEnvironment - - Futures.future(new Callable[TopologyExecutionEntity]{ - override def call(): TopologyExecutionEntity = { - ExecutionPlatformFactory.getApplicationManager(clusterType).status(topologyExecution, _config) - topologyExecution - } - }, context.dispatcher) onComplete { - case _ => - _dao.updateTopologyExecutionStatus(topologyExecution) - } - } - - def updateStatus(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity): Unit = { - _dao.updateOperationStatus(topologyOperation) - _dao.updateTopologyExecutionStatus(topologyExecution) - } - - def execute(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity): Unit = { - try { - topologyOperation.getOperation match { - case OPERATION.START => - start(topologyExecution, topologyOperation) - case OPERATION.STOP => - stop(topologyExecution, topologyOperation) - case m@_ => - log.warning("Unsupported operation: " + topologyOperation) - throw new Exception(s"Unsupported operation: ${topologyOperation.getOperation}, possible values are START/STOP") - } - } catch { - case e: Throwable => - topologyOperation.setMessage(e.getMessage) - topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED) - _dao.updateOperationStatus(topologyOperation) - } - } - - override def receive = { - case InitializationEvent(config: Config) => - _config = config - _dao = new ApplicationSchedulerAsyncDAO(config, context.dispatcher) - case SchedulerCommand(topologyExecution, topologyOperation) => - execute(topologyExecution, topologyOperation) - case HealthCheckerEvent => - _dao.loadAllTopologyExecutionEntities() onComplete { - case Success(topologyExecutions) => - log.info(s"Load ${topologyExecutions.size()} topologies in execution") - JavaConversions.collectionAsScalaIterable(topologyExecutions) foreach { topologyExecution => - try{ - status(topologyExecution) - } catch { - case ex: Throwable => - log.error(ex.getMessage) - } - } - case Failure(ex) => - log.error(s"Fail to load any topologyExecutionEntity due to Exception: ${ex.getMessage}") - } - case TerminatedEvent => - context.stop(self) - case m@_ => - log.warning("Unsupported operation $m") - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala deleted file mode 100644 index c731846..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application.scheduler - -import akka.actor.{Actor, ActorLogging} -import com.typesafe.config.Config -import org.apache.eagle.service.application.entity.TopologyOperationEntity.OPERATION_STATUS -import org.apache.eagle.stream.application.ApplicationSchedulerAsyncDAO - -import scala.collection.JavaConversions -import scala.util.{Failure, Success} - - -private[scheduler] class AppCommandLoader extends Actor with ActorLogging { - @volatile var _config: Config = null - @volatile var _dao: ApplicationSchedulerAsyncDAO = null - - import context.dispatcher - - override def receive = { - case InitializationEvent(config: Config) => - _config = config - _dao = new ApplicationSchedulerAsyncDAO(config, context.dispatcher) - case ClearPendingOperation => - if(_dao == null) _dao = new ApplicationSchedulerAsyncDAO(_config, context.dispatcher) - _dao.clearPendingOperations() - case CommandLoaderEvent => { - val _sender = sender() - _dao.readOperationsByStatus(OPERATION_STATUS.INITIALIZED) onComplete { - case Success(commands) => { - log.info(s"Load ${commands.size()} new commands") - JavaConversions.collectionAsScalaIterable(commands) foreach { command => - command.setStatus(OPERATION_STATUS.PENDING) - _dao.updateOperationStatus(command) onComplete { - case Success(response) => - _dao.loadTopologyExecutionByName(command.getSite, command.getApplication, command.getTopology) onComplete { - case Success(topologyExecution) => { - _sender ! SchedulerCommand(topologyExecution, command) - } - case Failure(ex) => - log.error(ex.getMessage) - command.setMessage(ex.getMessage) - command.setStatus(OPERATION_STATUS.FAILED) - _dao.updateOperationStatus(command) - } - case Failure(ex) => - log.error(s"Got an exception to update command status $command: ${ex.getMessage}") - command.setMessage(ex.getMessage) - command.setStatus(OPERATION_STATUS.FAILED) - _dao.updateOperationStatus(command) - } - } - } - case Failure(ex) => - log.error(s"Failed to get commands due to exception ${ex.getMessage}") - } - } - case TerminatedEvent => - context.stop(self) - case m@_ => throw new UnsupportedOperationException(s"Event is not supported $m") - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala deleted file mode 100644 index 476a3fb..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application.scheduler - -import akka.actor.{ActorSystem, Props} -import com.typesafe.config.Config -import org.apache.eagle.service.application.AppManagerConstants -import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyOperationEntity} -import org.apache.eagle.stream.application.ApplicationManager - -import scala.concurrent.duration._ - - -private[scheduler] class ScheduleEvent -private[scheduler] case class InitializationEvent(config: Config) extends ScheduleEvent -private[scheduler] case class TerminatedEvent() extends ScheduleEvent -private[scheduler] case class CommandLoaderEvent() extends ScheduleEvent -private[scheduler] case class HealthCheckerEvent() extends ScheduleEvent -private[scheduler] case class ClearPendingOperation() extends ScheduleEvent -private[scheduler] case class SchedulerCommand(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) extends ScheduleEvent - -case class EagleServiceUnavailableException(message:String) extends Exception(message) -case class DuplicatedDefinitionException(message:String) extends Exception(message) -case class LoadTopologyFailureException(message:String) extends Exception(message) - - -/** - * 1. Sync command from eagle service - * 2. Coordinate command to different actor - * 3. Actor execute command as requested - */ -class ApplicationScheduler { - //val config = ConfigFactory.load() - val DEFAULT_COMMAND_LOADER_INTERVAL_SECS = 2 - val DEFAULT_HEALTH_CHECK_INTERVAL_SECS = 5 - - def start(config: Config) = { - val system = ActorSystem("application-manager-scheduler", config) - system.log.info(s"Started actor system: $system") - - import system.dispatcher - - val commandLoaderIntervalSecs: Long = if(config.hasPath(AppManagerConstants.APP_COMMAND_LOADER_INTERVAL_SECS)) config.getLong(AppManagerConstants.APP_COMMAND_LOADER_INTERVAL_SECS) else DEFAULT_COMMAND_LOADER_INTERVAL_SECS - val healthCheckIntervalSecs: Long = if(config.hasPath(AppManagerConstants.APP_HEALTH_CHECK_INTERVAL_SECS)) config.getLong(AppManagerConstants.APP_HEALTH_CHECK_INTERVAL_SECS) else DEFAULT_HEALTH_CHECK_INTERVAL_SECS - - val coordinator = system.actorOf(Props[StreamAppCoordinator]) - system.scheduler.scheduleOnce(0 seconds, coordinator, InitializationEvent(config)) - system.scheduler.scheduleOnce(1 seconds, coordinator, ClearPendingOperation) - system.scheduler.schedule(2.seconds, commandLoaderIntervalSecs.seconds, coordinator, CommandLoaderEvent) - system.scheduler.schedule(10.seconds, healthCheckIntervalSecs.seconds, coordinator, HealthCheckerEvent) - - /* - registerOnTermination is called when you have shut down the ActorSystem (system.shutdown), - and the callbacks will be executed after all actors have been stopped. - */ - system.registerOnTermination(new Runnable { - override def run(): Unit = { - coordinator ! TerminatedEvent - ApplicationManager.stopAll() - } - }) - system - } -} - http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala deleted file mode 100644 index 17006ee..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application.scheduler - -import akka.actor.{Actor, ActorLogging, ActorRef, Props} - -private[scheduler] class StreamAppCoordinator extends Actor with ActorLogging { - var commandLoader: ActorRef = null - var commandExecutor: ActorRef = null - - - override def preStart(): Unit = { - commandLoader = context.actorOf(Props[AppCommandLoader], "command-loader") - commandExecutor = context.actorOf(Props[AppCommandExecutor], "command-worker") - } - - override def receive = { - case InitializationEvent(config) => { - log.info(s"Config updated: $config") - commandLoader ! InitializationEvent(config) - commandExecutor ! InitializationEvent(config) - } - case ClearPendingOperation => - commandLoader ! ClearPendingOperation - case CommandLoaderEvent => - commandLoader ! CommandLoaderEvent - case command: SchedulerCommand => - log.info(s"Executing command: $SchedulerCommand") - commandExecutor ! command - case HealthCheckerEvent => - commandExecutor ! HealthCheckerEvent - case TerminatedEvent => - log.info("Coordinator exit ...") - context.stop(self) - case m@_ => - log.warning(s"Coordinator Unsupported message: $m") - } -}
