Repository: incubator-eagle Updated Branches: refs/heads/develop 9432fcf91 -> acee5cb33
[EAGLE-458] Migrate eagle-jpm-spark-running using appplication framework https://issues.apache.org/jira/browse/EAGLE-458 Author: Hao Chen <h...@apache.org> Closes #335 from haoch/EAGLE-458. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/acee5cb3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/acee5cb3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/acee5cb3 Branch: refs/heads/develop Commit: acee5cb334a266ed1f7cb215d0f8252fd5b4e067 Parents: 9432fcf Author: Hao Chen <h...@apache.org> Authored: Fri Aug 19 12:16:57 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Fri Aug 19 12:16:57 2016 +0800 ---------------------------------------------------------------------- .../eagle/app/service/ApplicationContext.java | 20 +- .../apache/eagle/app/test/AppJUnitRunner.java | 2 +- .../eagle/app/test/AppTestGuiceModule.java | 42 ---- .../eagle/app/test/ApplicationSimulator.java | 70 +++++++ .../app/test/ApplicationSimulatorImpl.java | 92 +++++++++ .../eagle/app/test/ApplicationTestBase.java | 43 ++++ .../app/test/ApplicationTestGuiceModule.java | 42 ++++ .../apache/eagle/app/test/ServerSimulator.java | 70 ------- .../eagle/app/test/ServerSimulatorImpl.java | 92 --------- .../example/ExampleApplicationProviderTest.java | 4 +- eagle-jpm/eagle-jpm-spark-running/pom.xml | 5 + .../jpm/spark/running/SparkRunningJobApp.java | 67 +++++++ .../spark/running/SparkRunningJobAppConfig.java | 175 +++++++++++++++++ .../running/SparkRunningJobAppProvider.java | 26 +++ .../jpm/spark/running/SparkRunningJobMain.java | 61 +----- .../common/SparkRunningConfigManager.java | 151 -------------- .../parser/SparkAppEntityCreationHandler.java | 6 +- .../running/parser/SparkApplicationParser.java | 10 +- .../running/recover/SparkRunningJobManager.java | 4 +- .../storm/SparkRunningJobFetchSpout.java | 14 +- .../running/storm/SparkRunningJobParseBolt.java | 18 +- ...spark.running.SparkRunningJobAppProvider.xml | 195 +++++++++++++++++++ ...org.apache.eagle.app.spi.ApplicationProvider | 16 ++ .../src/main/resources/application.conf | 9 +- .../java/SparkRunningJobAppProviderTest.java | 32 +++ pom.xml | 1 + 26 files changed, 808 insertions(+), 459 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java index 76ee289..91d33ca 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java @@ -73,15 +73,17 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle { @Override public void onInstall() { - List<StreamDesc> streamDescCollection = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> { - StreamSinkConfig streamSinkConfig = this.runtime.environment().streamSink().getSinkConfig(streamDefinition.getStreamId(),this.config); - StreamDesc streamDesc = new StreamDesc(); - streamDesc.setSchema(streamDefinition); - streamDesc.setSink(streamSinkConfig); - streamDesc.setStreamId(streamDefinition.getStreamId()); - return streamDesc; - })).collect(Collectors.toList()); - metadata.setStreams(streamDescCollection); + if(metadata.getDescriptor().getStreams()!=null) { + List<StreamDesc> streamDescCollection = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> { + StreamSinkConfig streamSinkConfig = this.runtime.environment().streamSink().getSinkConfig(streamDefinition.getStreamId(), this.config); + StreamDesc streamDesc = new StreamDesc(); + streamDesc.setSchema(streamDefinition); + streamDesc.setSink(streamSinkConfig); + streamDesc.setStreamId(streamDefinition.getStreamId()); + return streamDesc; + })).collect(Collectors.toList()); + metadata.setStreams(streamDescCollection); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java index 572af2c..b8174bb 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppJUnitRunner.java @@ -52,7 +52,7 @@ public class AppJUnitRunner extends BlockJUnit4ClassRunner { throws InitializationError { final List<Module> modules = new ArrayList<>(); - AppTestGuiceModule testGuiceModule = new AppTestGuiceModule(); + ApplicationTestGuiceModule testGuiceModule = new ApplicationTestGuiceModule(); // Add default modules modules.add(testGuiceModule); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java deleted file mode 100644 index 9b30ee4..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.test; - -import com.google.inject.AbstractModule; -import com.google.inject.Singleton; -import org.apache.eagle.app.module.ApplicationExtensionLoader; -import org.apache.eagle.app.module.ApplicationGuiceModule; -import org.apache.eagle.common.module.CommonGuiceModule; -import org.apache.eagle.common.module.GlobalScope; -import org.apache.eagle.common.module.ModuleRegistry; -import org.apache.eagle.metadata.service.memory.MemoryMetadataStore; - -public class AppTestGuiceModule extends AbstractModule{ - @Override - protected void configure() { - CommonGuiceModule common = new CommonGuiceModule(); - ApplicationGuiceModule app = new ApplicationGuiceModule(); - MemoryMetadataStore store = new MemoryMetadataStore(); - install(common); - install(app); - install(store); - ModuleRegistry registry =ApplicationExtensionLoader.load(common,app,store); - registry.getModules(store.getClass()).forEach(this::install); - registry.getModules(GlobalScope.class).forEach(this::install); - bind(ServerSimulator.class).to(ServerSimulatorImpl.class).in(Singleton.class); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java new file mode 100644 index 0000000..3e4aa21 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.test; + +import com.google.inject.Guice; +import com.google.inject.Module; +import org.apache.eagle.app.spi.ApplicationProvider; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * Application test simulator for developer to quickly run application without diving into application lifecycle + */ +public abstract class ApplicationSimulator { + /** + * + * @param appType + */ + public abstract void start(String appType); + + /** + * + * @param appType + * @param appConfig + */ + public abstract void start(String appType, Map<String,Object> appConfig); + + /** + * + * @param appProviderClass + */ + public abstract void start(Class<? extends ApplicationProvider> appProviderClass); + + /** + * + * @param appProviderClass + * @param appConfig + */ + public abstract void start(Class<? extends ApplicationProvider> appProviderClass, Map<String,Object> appConfig) throws Exception; + + public static ApplicationSimulator getInstance(){ + return Guice.createInjector(new ApplicationTestGuiceModule()).getInstance(ApplicationSimulator.class); + } + + /** + * @param modules additional modules + * @return ApplicationSimulator instance + */ + public static ApplicationSimulator getInstance(Module ... modules){ + List<Module> contextModules = Arrays.asList(modules); + contextModules.add(new ApplicationTestGuiceModule()); + return Guice.createInjector(contextModules).getInstance(ApplicationSimulator.class); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java new file mode 100644 index 0000000..35dead2 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.test; + +import com.google.inject.Inject; +import com.typesafe.config.Config; +import org.apache.eagle.app.config.ApplicationProviderConfig; +import org.apache.eagle.app.resource.ApplicationResource; +import org.apache.eagle.app.service.ApplicationOperations; +import org.apache.eagle.app.spi.ApplicationProvider; +import org.apache.eagle.app.utils.DynamicJarPathFinder; +import org.apache.eagle.metadata.model.ApplicationEntity; +import org.apache.eagle.metadata.model.SiteEntity; +import org.apache.eagle.metadata.resource.SiteResource; +import org.junit.Assert; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class ApplicationSimulatorImpl extends ApplicationSimulator { + private final Config config; + private final SiteResource siteResource; + private final ApplicationResource applicationResource; + + @Inject + public ApplicationSimulatorImpl(Config config, SiteResource siteResource, ApplicationResource applicationResource){ + this.config = config; + this.siteResource = siteResource; + this.applicationResource = applicationResource; + } + + @Override + public void start(String appType) { + start(appType, new HashMap<>()); + } + + @Override + public void start(String appType, Map<String, Object> appConfig) { + SiteEntity siteEntity = getUniqueSite(); + siteResource.createSite(siteEntity); + Assert.assertNotNull(siteEntity.getUuid()); + ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL); + installOperation.setConfiguration(appConfig); + // Install application + ApplicationEntity applicationEntity = + applicationResource.installApplication(installOperation).getData(); + // Start application + applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid())); + } + + private final static AtomicInteger incr = new AtomicInteger(); + + private SiteEntity getUniqueSite(){ + // Create local site + SiteEntity siteEntity = new SiteEntity(); + siteEntity.setSiteId("SIMULATED_SITE_"+incr.incrementAndGet()); + siteEntity.setSiteName(siteEntity.getSiteId()); + siteEntity.setDescription("Automatically generated unique simulation site "+siteEntity.getSiteId()+" (simulator: "+this+")"); + return siteEntity; + } + + @Override + public void start(Class<? extends ApplicationProvider> appProviderClass) { + start(appProviderClass, new HashMap<>()); + } + + @Override + public void start(Class<? extends ApplicationProvider> appProviderClass, Map<String, Object> appConfig) { + try { + ApplicationProvider applicationProvider = appProviderClass.newInstance(); + applicationProvider.prepare(new ApplicationProviderConfig(DynamicJarPathFinder.findPath(appProviderClass),appProviderClass),config); + start(applicationProvider.getApplicationDesc().getType(),appConfig); + } catch (InstantiationException | IllegalAccessException e) { + throw new IllegalStateException(e.getMessage(),e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java new file mode 100644 index 0000000..1c7d6be --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestBase.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.test; + +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import org.apache.commons.dbcp.BasicDataSource; +import org.apache.eagle.common.module.CommonGuiceModule; +import org.apache.eagle.metadata.service.memory.MemoryMetadataStore; +import org.junit.After; +import org.junit.Before; + +import javax.sql.DataSource; +import java.sql.SQLException; + +public class ApplicationTestBase { + private Injector injector; + + @Before + public void setUp(){ + injector = Guice.createInjector(new ApplicationTestGuiceModule()); + injector.injectMembers(this); + } + + public Injector injector(){ + return injector; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestGuiceModule.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestGuiceModule.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestGuiceModule.java new file mode 100644 index 0000000..4f7b2b4 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationTestGuiceModule.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.test; + +import com.google.inject.AbstractModule; +import com.google.inject.Singleton; +import org.apache.eagle.app.module.ApplicationExtensionLoader; +import org.apache.eagle.app.module.ApplicationGuiceModule; +import org.apache.eagle.common.module.CommonGuiceModule; +import org.apache.eagle.common.module.GlobalScope; +import org.apache.eagle.common.module.ModuleRegistry; +import org.apache.eagle.metadata.service.memory.MemoryMetadataStore; + +public class ApplicationTestGuiceModule extends AbstractModule{ + @Override + protected void configure() { + CommonGuiceModule common = new CommonGuiceModule(); + ApplicationGuiceModule app = new ApplicationGuiceModule(); + MemoryMetadataStore store = new MemoryMetadataStore(); + install(common); + install(app); + install(store); + ModuleRegistry registry =ApplicationExtensionLoader.load(common,app,store); + registry.getModules(store.getClass()).forEach(this::install); + registry.getModules(GlobalScope.class).forEach(this::install); + bind(ApplicationSimulator.class).to(ApplicationSimulatorImpl.class).in(Singleton.class); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java deleted file mode 100644 index a91af77..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulator.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.test; - -import com.google.inject.Guice; -import com.google.inject.Module; -import org.apache.eagle.app.spi.ApplicationProvider; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -/** - * Application test simulator for developer to quickly run application without diving into application lifecycle - */ -public abstract class ServerSimulator { - /** - * - * @param appType - */ - public abstract void start(String appType); - - /** - * - * @param appType - * @param appConfig - */ - public abstract void start(String appType, Map<String,Object> appConfig); - - /** - * - * @param appProviderClass - */ - public abstract void start(Class<? extends ApplicationProvider> appProviderClass); - - /** - * - * @param appProviderClass - * @param appConfig - */ - public abstract void start(Class<? extends ApplicationProvider> appProviderClass, Map<String,Object> appConfig) throws Exception; - - public static ServerSimulator getInstance(){ - return Guice.createInjector(new AppTestGuiceModule()).getInstance(ServerSimulator.class); - } - - /** - * @param modules additional modules - * @return ServerSimulator instance - */ - public static ServerSimulator getInstance(Module ... modules){ - List<Module> contextModules = Arrays.asList(modules); - contextModules.add(new AppTestGuiceModule()); - return Guice.createInjector(contextModules).getInstance(ServerSimulator.class); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java deleted file mode 100644 index 1ef91ff..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.test; - -import com.google.inject.Inject; -import com.typesafe.config.Config; -import org.apache.eagle.app.config.ApplicationProviderConfig; -import org.apache.eagle.app.resource.ApplicationResource; -import org.apache.eagle.app.service.ApplicationOperations; -import org.apache.eagle.app.spi.ApplicationProvider; -import org.apache.eagle.app.utils.DynamicJarPathFinder; -import org.apache.eagle.metadata.model.ApplicationEntity; -import org.apache.eagle.metadata.model.SiteEntity; -import org.apache.eagle.metadata.resource.SiteResource; -import org.junit.Assert; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -public class ServerSimulatorImpl extends ServerSimulator { - private final Config config; - private final SiteResource siteResource; - private final ApplicationResource applicationResource; - - @Inject - public ServerSimulatorImpl(Config config, SiteResource siteResource, ApplicationResource applicationResource){ - this.config = config; - this.siteResource = siteResource; - this.applicationResource = applicationResource; - } - - @Override - public void start(String appType) { - start(appType, new HashMap<>()); - } - - @Override - public void start(String appType, Map<String, Object> appConfig) { - SiteEntity siteEntity = getUniqueSite(); - siteResource.createSite(siteEntity); - Assert.assertNotNull(siteEntity.getUuid()); - ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL); - installOperation.setConfiguration(appConfig); - // Install application - ApplicationEntity applicationEntity = - applicationResource.installApplication(installOperation).getData(); - // Start application - applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid())); - } - - private final static AtomicInteger incr = new AtomicInteger(); - - private SiteEntity getUniqueSite(){ - // Create local site - SiteEntity siteEntity = new SiteEntity(); - siteEntity.setSiteId("SIMULATED_SITE_"+incr.incrementAndGet()); - siteEntity.setSiteName(siteEntity.getSiteId()); - siteEntity.setDescription("Automatically generated unique simulation site "+siteEntity.getSiteId()+" (simulator: "+this+")"); - return siteEntity; - } - - @Override - public void start(Class<? extends ApplicationProvider> appProviderClass) { - start(appProviderClass, new HashMap<>()); - } - - @Override - public void start(Class<? extends ApplicationProvider> appProviderClass, Map<String, Object> appConfig) { - try { - ApplicationProvider applicationProvider = appProviderClass.newInstance(); - applicationProvider.prepare(new ApplicationProviderConfig(DynamicJarPathFinder.findPath(appProviderClass),appProviderClass),config); - start(applicationProvider.getApplicationDesc().getType(),appConfig); - } catch (InstantiationException | IllegalAccessException e) { - throw new IllegalStateException(e.getMessage(),e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java ---------------------------------------------------------------------- diff --git a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java index e07f487..1c801bd 100644 --- a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java +++ b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java @@ -21,7 +21,7 @@ import org.apache.eagle.app.example.extensions.ExampleEntity; import org.apache.eagle.app.example.extensions.ExampleResource; import org.apache.eagle.app.resource.ApplicationResource; import org.apache.eagle.app.service.ApplicationOperations; -import org.apache.eagle.app.test.ServerSimulator; +import org.apache.eagle.app.test.ApplicationSimulator; import org.apache.eagle.app.test.AppJUnitRunner; import org.apache.eagle.common.module.GlobalScope; import org.apache.eagle.metadata.model.ApplicationDesc; @@ -41,7 +41,7 @@ import java.util.Map; public class ExampleApplicationProviderTest { @Inject private SiteResource siteResource; @Inject private ApplicationResource applicationResource; - @Inject private ServerSimulator simulator; + @Inject private ApplicationSimulator simulator; @Inject private ExampleResource exampleResource; @Test http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/pom.xml b/eagle-jpm/eagle-jpm-spark-running/pom.xml index cc53e7c..34d8545 100644 --- a/eagle-jpm/eagle-jpm-spark-running/pom.xml +++ b/eagle-jpm/eagle-jpm-spark-running/pom.xml @@ -132,6 +132,11 @@ <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-app-base</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> <resources> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java new file mode 100644 index 0000000..61c0751 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.spark.running; + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import com.typesafe.config.Config; +import org.apache.eagle.app.StormApplication; +import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobFetchSpout; +import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobParseBolt; + +public class SparkRunningJobApp extends StormApplication { + @Override + public StormTopology execute(Config config, StormEnvironment environment) { + //1. trigger init conf + SparkRunningJobAppConfig sparkRunningJobAppConfig = SparkRunningJobAppConfig.getInstance(config); + + //2. init topology + TopologyBuilder topologyBuilder = new TopologyBuilder(); + final String spoutName = SparkRunningJobAppConfig.JOB_FETCH_SPOUT_NAME; + final String boltName = SparkRunningJobAppConfig.JOB_PARSE_BOLT_NAME; + int parallelism = sparkRunningJobAppConfig.getTopologyConfig().jobFetchSpoutParallism; + int tasks = sparkRunningJobAppConfig.getTopologyConfig().jobFetchSpoutTasksNum; + if (parallelism > tasks) { + parallelism = tasks; + } + topologyBuilder.setSpout( + spoutName, + new SparkRunningJobFetchSpout( + sparkRunningJobAppConfig.getJobExtractorConfig(), + sparkRunningJobAppConfig.getEndpointConfig(), + sparkRunningJobAppConfig.getZkStateConfig()), + parallelism + ).setNumTasks(tasks); + + parallelism = sparkRunningJobAppConfig.getTopologyConfig().jobParseBoltParallism; + tasks = sparkRunningJobAppConfig.getTopologyConfig().jobParseBoltTasksNum; + if (parallelism > tasks) { + parallelism = tasks; + } + topologyBuilder.setBolt(boltName, + new SparkRunningJobParseBolt( + sparkRunningJobAppConfig.getZkStateConfig(), + sparkRunningJobAppConfig.getEagleServiceConfig(), + sparkRunningJobAppConfig.getEndpointConfig(), + sparkRunningJobAppConfig.getJobExtractorConfig()), + parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId")); + + return topologyBuilder.createTopology(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java new file mode 100644 index 0000000..668bc02 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.spark.running; + +import com.typesafe.config.Config; +import org.apache.eagle.dataproc.util.ConfigOptionParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +public class SparkRunningJobAppConfig implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobAppConfig.class); + static final String JOB_FETCH_SPOUT_NAME = "sparkRunningJobFetchSpout"; + static final String JOB_PARSE_BOLT_NAME = "sparkRunningJobParseBolt"; + + public String getEnv() { + return env; + } + private String env; + + ZKStateConfig getZkStateConfig() { return zkStateConfig; } + private ZKStateConfig zkStateConfig; + private TopologyConfig topologyConfig; + public TopologyConfig getTopologyConfig(){ + return topologyConfig; + } + + public EagleServiceConfig getEagleServiceConfig() { + return eagleServiceConfig; + } + private EagleServiceConfig eagleServiceConfig; + + public JobExtractorConfig getJobExtractorConfig() { + return jobExtractorConfig; + } + private JobExtractorConfig jobExtractorConfig; + + public EndpointConfig getEndpointConfig() { + return endpointConfig; + } + private EndpointConfig endpointConfig; + + public static class TopologyConfig implements Serializable { + public int jobFetchSpoutParallism; + public int jobFetchSpoutTasksNum; + public int jobParseBoltParallism; + public int jobParseBoltTasksNum; + } + + public static class ZKStateConfig implements Serializable { + public String zkQuorum; + public String zkRoot; + public int zkSessionTimeoutMs; + public int zkRetryTimes; + public int zkRetryInterval; + public String zkPort; + public boolean recoverEnabled; + } + + public static class EagleServiceConfig implements Serializable { + public String eagleServiceHost; + public int eagleServicePort; + public int readTimeoutSeconds; + public int maxFlushNum; + public String username; + public String password; + } + + public static class JobExtractorConfig implements Serializable { + public String site; + public int fetchRunningJobInterval; + public int parseThreadPoolSize; + } + + public static class EndpointConfig implements Serializable { + public String nnEndpoint; + public String eventLog; + public String[] rmUrls; + public String principal; + public String keyTab; + } + + public Config getConfig() { + return config; + } + private Config config; + + private static SparkRunningJobAppConfig manager = new SparkRunningJobAppConfig(); + + private SparkRunningJobAppConfig() { + this.eagleServiceConfig = new EagleServiceConfig(); + this.jobExtractorConfig = new JobExtractorConfig(); + this.endpointConfig = new EndpointConfig(); + this.zkStateConfig = new ZKStateConfig(); + this.topologyConfig = new TopologyConfig(); + } + + public static SparkRunningJobAppConfig getInstance(String[] args) { + try { + LOG.info("Loading from configuration file"); + manager.init(new ConfigOptionParser().load(args)); + } catch (Exception e) { + LOG.error("failed to load config"); + } + return manager; + } + + public static SparkRunningJobAppConfig getInstance(Config config) { + manager.init(config); + return manager; + } + + private void init(Config config){ + this.config = config; + this.env = config.getString("envContextConfig.env"); + this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum"); + this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort"); + this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs"); + this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes"); + this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval"); + this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot"); + this.zkStateConfig.recoverEnabled = config.getBoolean("zookeeperConfig.recoverEnabled"); + + + // parse eagle service endpoint + this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host"); + String port = config.getString("eagleProps.eagleService.port"); + this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port)); + this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username"); + this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password"); + this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds"); + this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum"); + + //parse job extractor + this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site"); + this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval"); + this.jobExtractorConfig.parseThreadPoolSize = config.getInt("jobExtractorConfig.parseThreadPoolSize"); + + //parse data source config + this.endpointConfig.eventLog = config.getString("dataSourceConfig.eventLog"); + this.endpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint"); + this.endpointConfig.keyTab = config.getString("dataSourceConfig.keytab"); + this.endpointConfig.principal = config.getString("dataSourceConfig.principal"); + + this.endpointConfig.rmUrls = config.getString("dataSourceConfig.rmUrls").split(","); + + this.topologyConfig.jobFetchSpoutParallism = config.getInt("envContextConfig.parallelismConfig." + JOB_FETCH_SPOUT_NAME); + this.topologyConfig.jobFetchSpoutTasksNum = config.getInt("envContextConfig.tasks." + JOB_FETCH_SPOUT_NAME); + this.topologyConfig.jobParseBoltParallism = config.getInt("envContextConfig.parallelismConfig." + JOB_PARSE_BOLT_NAME); + this.topologyConfig.jobParseBoltTasksNum = config.getInt("envContextConfig.tasks." + JOB_PARSE_BOLT_NAME); + + LOG.info("Successfully initialized SparkRunningJobAppConfig"); + LOG.info("env: " + this.env); + LOG.info("site: " + this.jobExtractorConfig.site); + LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost); + LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java new file mode 100644 index 0000000..3d20af7 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.spark.running; + +import org.apache.eagle.app.spi.AbstractApplicationProvider; + +public class SparkRunningJobAppProvider extends AbstractApplicationProvider<SparkRunningJobApp> { + @Override + public SparkRunningJobApp getApplication() { + return new SparkRunningJobApp(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java index 749f4d1..fe4a68c 100644 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobMain.java @@ -18,67 +18,8 @@ package org.apache.eagle.jpm.spark.running; -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager; -import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobFetchSpout; -import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobParseBolt; - public class SparkRunningJobMain { public static void main(String[] args) { - try { - //1. trigger init conf - SparkRunningConfigManager sparkRunningConfigManager = SparkRunningConfigManager.getInstance(args); - - //2. init topology - TopologyBuilder topologyBuilder = new TopologyBuilder(); - String topologyName = sparkRunningConfigManager.getConfig().getString("envContextConfig.topologyName"); - String spoutName = "sparkRunningJobFetchSpout"; - String boltName = "sparkRunningJobParseBolt"; - int parallelism = sparkRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName); - int tasks = sparkRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName); - if (parallelism > tasks) { - parallelism = tasks; - } - topologyBuilder.setSpout( - spoutName, - new SparkRunningJobFetchSpout( - sparkRunningConfigManager.getJobExtractorConfig(), - sparkRunningConfigManager.getEndpointConfig(), - sparkRunningConfigManager.getZkStateConfig()), - parallelism - ).setNumTasks(tasks); - - parallelism = sparkRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + boltName); - tasks = sparkRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + boltName); - if (parallelism > tasks) { - parallelism = tasks; - } - topologyBuilder.setBolt(boltName, - new SparkRunningJobParseBolt( - sparkRunningConfigManager.getZkStateConfig(), - sparkRunningConfigManager.getEagleServiceConfig(), - sparkRunningConfigManager.getEndpointConfig(), - sparkRunningConfigManager.getJobExtractorConfig()), - parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId")); - - backtype.storm.Config config = new backtype.storm.Config(); - config.setNumWorkers(sparkRunningConfigManager.getConfig().getInt("envContextConfig.workers")); - config.put(Config.TOPOLOGY_DEBUG, true); - if (!sparkRunningConfigManager.getEnv().equals("local")) { - //cluster mode - //parse conf here - StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology()); - } else { - //local mode - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology(topologyName, config, topologyBuilder.createTopology()); - } - } catch (Exception e) { - e.printStackTrace(); - } + new SparkRunningJobApp().run(args); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java deleted file mode 100644 index b05d12e..0000000 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/common/SparkRunningConfigManager.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.eagle.jpm.spark.running.common; - -import com.typesafe.config.Config; -import org.apache.eagle.dataproc.util.ConfigOptionParser; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; - -public class SparkRunningConfigManager implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(SparkRunningConfigManager.class); - - public String getEnv() { - return env; - } - private String env; - - public ZKStateConfig getZkStateConfig() { return zkStateConfig; } - private ZKStateConfig zkStateConfig; - - public EagleServiceConfig getEagleServiceConfig() { - return eagleServiceConfig; - } - private EagleServiceConfig eagleServiceConfig; - - public JobExtractorConfig getJobExtractorConfig() { - return jobExtractorConfig; - } - private JobExtractorConfig jobExtractorConfig; - - public EndpointConfig getEndpointConfig() { - return endpointConfig; - } - private EndpointConfig endpointConfig; - - public static class ZKStateConfig implements Serializable { - public String zkQuorum; - public String zkRoot; - public int zkSessionTimeoutMs; - public int zkRetryTimes; - public int zkRetryInterval; - public String zkPort; - public boolean recoverEnabled; - } - - public static class EagleServiceConfig implements Serializable { - public String eagleServiceHost; - public int eagleServicePort; - public int readTimeoutSeconds; - public int maxFlushNum; - public String username; - public String password; - } - - public static class JobExtractorConfig implements Serializable { - public String site; - public int fetchRunningJobInterval; - public int parseThreadPoolSize; - } - - public static class EndpointConfig implements Serializable { - public String nnEndpoint; - public String eventLog; - public String[] rmUrls; - public String principal; - public String keyTab; - } - - public Config getConfig() { - return config; - } - private Config config; - - private static SparkRunningConfigManager manager = new SparkRunningConfigManager(); - - private SparkRunningConfigManager() { - this.eagleServiceConfig = new EagleServiceConfig(); - this.jobExtractorConfig = new JobExtractorConfig(); - this.endpointConfig = new EndpointConfig(); - this.zkStateConfig = new ZKStateConfig(); - } - - public static SparkRunningConfigManager getInstance(String[] args) { - manager.init(args); - return manager; - } - - private void init(String[] args) { - try { - LOG.info("Loading from configuration file"); - this.config = new ConfigOptionParser().load(args); - } catch (Exception e) { - LOG.error("failed to load config"); - } - - this.env = config.getString("envContextConfig.env"); - - this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum"); - this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort"); - this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs"); - this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes"); - this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval"); - this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot"); - this.zkStateConfig.recoverEnabled = config.getBoolean("zookeeperConfig.recoverEnabled"); - - // parse eagle service endpoint - this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host"); - String port = config.getString("eagleProps.eagleService.port"); - this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port)); - this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username"); - this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password"); - this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds"); - this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum"); - - //parse job extractor - this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site"); - this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval"); - this.jobExtractorConfig.parseThreadPoolSize = config.getInt("jobExtractorConfig.parseThreadPoolSize"); - - //parse data source config - this.endpointConfig.eventLog = config.getString("dataSourceConfig.eventLog"); - this.endpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint"); - this.endpointConfig.keyTab = config.getString("dataSourceConfig.keytab"); - this.endpointConfig.principal = config.getString("dataSourceConfig.principal"); - this.endpointConfig.rmUrls = config.getStringList("dataSourceConfig.rmUrls").toArray(new String[0]); - - LOG.info("Successfully initialized SparkRunningConfigManager"); - LOG.info("env: " + this.env); - LOG.info("site: " + this.jobExtractorConfig.site); - LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost); - LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java index 5491a80..92adfa8 100644 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java @@ -18,7 +18,7 @@ package org.apache.eagle.jpm.spark.running.parser; -import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager; +import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; @@ -32,9 +32,9 @@ public class SparkAppEntityCreationHandler { private static final Logger LOG = LoggerFactory.getLogger(SparkAppEntityCreationHandler.class); private List<TaggedLogAPIEntity> entities = new ArrayList<>(); - private SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig; + private SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig; - public SparkAppEntityCreationHandler(SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig) { + public SparkAppEntityCreationHandler(SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig) { this.eagleServiceConfig = eagleServiceConfig; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java index bb76213..b2a5b63 100644 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java @@ -20,7 +20,7 @@ package org.apache.eagle.jpm.spark.running.parser; import org.apache.commons.lang3.tuple.Pair; import org.apache.eagle.jpm.spark.crawl.EventType; -import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager; +import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig; import org.apache.eagle.jpm.spark.running.entities.*; import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager; import org.apache.eagle.jpm.util.Constants; @@ -64,7 +64,7 @@ public class SparkApplicationParser implements Runnable { private Map<Integer, Pair<Integer, Pair<Long, Long>>> stagesTime; private Set<Integer> completeStages; private Configuration hdfsConf; - private SparkRunningConfigManager.EndpointConfig endpointConfig; + private SparkRunningJobAppConfig.EndpointConfig endpointConfig; private final Object lock = new Object(); private static final ObjectMapper OBJ_MAPPER = new ObjectMapper(); private Map<String, String> commonTags = new HashMap<>(); @@ -78,9 +78,9 @@ public class SparkApplicationParser implements Runnable { OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); } - public SparkApplicationParser(SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig, - SparkRunningConfigManager.EndpointConfig endpointConfig, - SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig, + public SparkApplicationParser(SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig, + SparkRunningJobAppConfig.EndpointConfig endpointConfig, + SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig, AppInfo app, Map<String, SparkAppEntity> sparkApp, SparkRunningJobManager sparkRunningJobManager, ResourceFetcher rmResourceFetcher) { this.sparkAppEntityCreationHandler = new SparkAppEntityCreationHandler(eagleServiceConfig); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java index 2b6c62f..11f7909 100644 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java @@ -19,7 +19,7 @@ package org.apache.eagle.jpm.spark.running.recover; import org.apache.commons.lang3.tuple.Pair; -import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager; +import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig; import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity; import org.apache.eagle.jpm.util.jobrecover.RunningJobManager; import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo; @@ -30,7 +30,7 @@ import java.util.*; public class SparkRunningJobManager implements Serializable { private RunningJobManager runningJobManager; - public SparkRunningJobManager(SparkRunningConfigManager.ZKStateConfig config) { + public SparkRunningJobManager(SparkRunningJobAppConfig.ZKStateConfig config) { this.runningJobManager = new RunningJobManager(config.zkQuorum, config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java index 6be0cfd..256829e 100644 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java @@ -24,7 +24,7 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; -import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager; +import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig; import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity; import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager; import org.apache.eagle.jpm.util.Constants; @@ -40,18 +40,18 @@ import java.util.*; public class SparkRunningJobFetchSpout extends BaseRichSpout { private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobFetchSpout.class); - private SparkRunningConfigManager.ZKStateConfig zkStateConfig; - private SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig; - private SparkRunningConfigManager.EndpointConfig endpointConfig; + private SparkRunningJobAppConfig.ZKStateConfig zkStateConfig; + private SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig; + private SparkRunningJobAppConfig.EndpointConfig endpointConfig; private ResourceFetcher resourceFetcher; private SpoutOutputCollector collector; private boolean init; private transient SparkRunningJobManager sparkRunningJobManager; private Set<String> runningYarnApps; - public SparkRunningJobFetchSpout(SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig, - SparkRunningConfigManager.EndpointConfig endpointConfig, - SparkRunningConfigManager.ZKStateConfig zkStateConfig) { + public SparkRunningJobFetchSpout(SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig, + SparkRunningJobAppConfig.EndpointConfig endpointConfig, + SparkRunningJobAppConfig.ZKStateConfig zkStateConfig) { this.jobExtractorConfig = jobExtractorConfig; this.endpointConfig = endpointConfig; this.zkStateConfig = zkStateConfig; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java index 6928240..d207ffc 100644 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java @@ -23,7 +23,7 @@ import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; -import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager; +import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig; import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity; import org.apache.eagle.jpm.spark.running.parser.SparkApplicationParser; import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager; @@ -44,17 +44,17 @@ import java.util.concurrent.ExecutorService; public class SparkRunningJobParseBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobParseBolt.class); - private SparkRunningConfigManager.ZKStateConfig zkStateConfig; - private SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig; - private SparkRunningConfigManager.EndpointConfig endpointConfig; - private SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig; + private SparkRunningJobAppConfig.ZKStateConfig zkStateConfig; + private SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig; + private SparkRunningJobAppConfig.EndpointConfig endpointConfig; + private SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig; private ExecutorService executorService; private Map<String, SparkApplicationParser> runningSparkParsers; private ResourceFetcher resourceFetcher; - public SparkRunningJobParseBolt(SparkRunningConfigManager.ZKStateConfig zkStateConfig, - SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig, - SparkRunningConfigManager.EndpointConfig endpointConfig, - SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig) { + public SparkRunningJobParseBolt(SparkRunningJobAppConfig.ZKStateConfig zkStateConfig, + SparkRunningJobAppConfig.EagleServiceConfig eagleServiceConfig, + SparkRunningJobAppConfig.EndpointConfig endpointConfig, + SparkRunningJobAppConfig.JobExtractorConfig jobExtractorConfig) { this.zkStateConfig = zkStateConfig; this.eagleServiceConfig = eagleServiceConfig; this.endpointConfig = endpointConfig; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml new file mode 100644 index 0000000..24cf09e --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider.xml @@ -0,0 +1,195 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<application> + <type>SPARK_RUNNING_JOB_APP</type> + <name>Spark Running Job Monitoring</name> + <version>0.5.0-incubating</version> + <appClass>org.apache.eagle.jpm.spark.running.SparkRunningJobApp</appClass> + <viewPath>/apps/jpm</viewPath> + <configuration> + <!-- org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig --> + <property> + <name>envContextConfig.env</name> + <value>local</value> + <displayName>Environment</displayName> + <description>Execution environment</description> + </property> + <property> + <name>zookeeperConfig.zkQuorum</name> + <displayName>zkQuorum</displayName> + <description>Zookeeper Quorum</description> + <value>sandbox.hortonworks.com:2181</value> + </property> + <property> + <name>zookeeperConfig.zkPort</name> + <displayName>zkPort</displayName> + <description>Zookeeper Port</description> + <value>2181</value> + </property> + <property> + <name>zookeeperConfig.zkSessionTimeoutMs</name> + <displayName>zkSessionTimeoutMs</displayName> + <description>Zookeeper session timeoutMs</description> + <value>15000</value> + </property> + <property> + <name>zookeeperConfig.zkRetryTimes</name> + <displayName>zkRetryTimes</displayName> + <description>zookeeperConfig.zkRetryTimes</description> + <value>3</value> + </property> + <property> + <name>zookeeperConfig.zkRetryInterval</name> + <displayName>zkRetryInterval</displayName> + <description>zookeeperConfig.zkRetryInterval</description> + <value>20000</value> + </property> + <property> + <name>zookeeperConfig.zkRoot</name> + <value>/apps/spark/running</value> + </property> + <property> + <name>zookeeperConfig.recoverEnabled</name> + <description>zookeeperConfig.recoverEnabled</description> + <value>false</value> + </property> + <property> + <name>eagleProps.eagleService.host</name> + <description>eagleProps.eagleService.host</description> + <value>sandbox.hortonworks.com</value> + </property> + <property> + <name>eagleProps.eagleService.port</name> + <description>eagleProps.eagleService.port</description> + <value>9099</value> + </property> + <property> + <name>eagleProps.eagleService.username</name> + <description>eagleProps.eagleService.username</description> + <value>admin</value> + </property> + <property> + <name>eagleProps.eagleService.password</name> + <description>eagleProps.eagleService.password</description> + <value>secret</value> + </property> + <property> + <name>eagleProps.eagleService.readTimeOutSeconds</name> + <description>eagleProps.eagleService.readTimeOutSeconds</description> + <value>20</value> + </property> + <property> + <name>eagleProps.eagleService.maxFlushNum</name> + <description>eagleProps.eagleService.maxFlushNum</description> + <value>500</value> + </property> + <property> + <name>jobExtractorConfig.site</name> + <description>jobExtractorConfig.site</description> + <value>sandbox</value> + </property> + <property> + <name>jobExtractorConfig.fetchRunningJobInterval</name> + <description>jobExtractorConfig.fetchRunningJobInterval</description> + <value>15</value> + </property> + <property> + <name>jobExtractorConfig.parseThreadPoolSize</name> + <description>jobExtractorConfig.parseThreadPoolSize</description> + <value>5</value> + </property> + <property> + <name>dataSourceConfig.eventLog</name> + <description>dataSourceConfig.eventLog</description> + <value>/spark-history</value> + </property> + <property> + <name>dataSourceConfig.nnEndpoint</name> + <description>dataSourceConfig.nnEndpoint</description> + <value>hdfs://sandbox.hortonworks.com:8020</value> + </property> + <property> + <name>dataSourceConfig.keytab</name> + <description>dataSourceConfig.keytab</description> + <value></value> + </property> + <property> + <name>dataSourceConfig.principal</name> + <description>dataSourceConfig.principal</description> + <value></value> + </property> + <property> + <name>dataSourceConfig.rmUrls</name> + <description>dataSourceConfig.rmUrls</description> + <value>http://sandbox.hortonworks.com:8088</value> + </property> + <property> + <name>envContextConfig.parallelismConfig.sparkRunningJobFetchSpout</name> + <description>Parallelism of sparkRunningJobFetchSpout </description> + <value>1</value> + </property> + <property> + <name>envContextConfig.tasks.sparkRunningJobFetchSpout</name> + <description>Tasks Num of sparkRunningJobFetchSpout </description> + <value>4</value> + </property> + <property> + <name>envContextConfig.parallelismConfig.sparkRunningJobParseBolt</name> + <description>Parallelism of sparkRunningJobParseBolt </description> + <value>1</value> + </property> + <property> + <name>envContextConfig.tasks.sparkRunningJobParseBolt</name> + <description>Tasks Num of sparkRunningJobParseBolt</description> + <value>4</value> + </property> + </configuration> + <docs> + <install> + # Step 1: Create source kafka topic named "${site}_example_source_topic" + + ./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1 + + # Step 2: Set up data collector to flow data into kafka topic in + + ./bin/logstash -f log_collector.conf + + ## `log_collector.conf` sample as following: + + input { + + } + filter { + + } + output{ + + } + + # Step 3: start application + + # Step 4: monitor with featured portal or alert with policies + </install> + <uninstall> + # Step 1: stop and uninstall application + # Step 2: delete kafka topic named "${site}_example_source_topic" + # Step 3: stop logstash + </uninstall> + </docs> +</application> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider new file mode 100644 index 0000000..6aef879 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf index d93a135..9d9f622 100644 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf @@ -15,8 +15,6 @@ { "envContextConfig" : { - "env" : "local", - "topologyName" : "sparkRunningJob", "stormConfigFile" : "storm.yaml", "parallelismConfig" : { "sparkRunningJobFetchSpout" : 1, @@ -28,15 +26,13 @@ }, "workers" : 2 }, - "jobExtractorConfig" : { "site" : "sandbox", "fetchRunningJobInterval" : 15, "parseThreadPoolSize" : 5 }, - "dataSourceConfig" : { - "rmUrls": ["http://sandbox.hortonworks.com:8088"], + "rmUrls": "http://sandbox.hortonworks.com:8088", "nnEndpoint" : "hdfs://sandbox.hortonworks.com:8020", "principal" : "", #if not need, then empty "keytab" : "", @@ -52,7 +48,8 @@ "zkRetryTimes" : 3, "zkRetryInterval" : 20000 }, - + "appId":"sparkRunningJob", + "mode":"LOCAL", "eagleProps" : { "mailHost" : "abc.com", "mailDebug" : "true", http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/eagle-jpm/eagle-jpm-spark-running/src/test/java/SparkRunningJobAppProviderTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/test/java/SparkRunningJobAppProviderTest.java b/eagle-jpm/eagle-jpm-spark-running/src/test/java/SparkRunningJobAppProviderTest.java new file mode 100644 index 0000000..346171a --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-running/src/test/java/SparkRunningJobAppProviderTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.google.inject.Inject; +import org.apache.eagle.app.test.ApplicationSimulator; +import org.apache.eagle.app.test.ApplicationTestBase; +import org.apache.eagle.jpm.spark.running.SparkRunningJobAppProvider; +import org.junit.Test; + +public class SparkRunningJobAppProviderTest extends ApplicationTestBase { + @Inject + ApplicationSimulator simulator; + + @Test + public void testRunWithProvider(){ + simulator.start(SparkRunningJobAppProvider.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/acee5cb3/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 04589f9..72d0ad7 100755 --- a/pom.xml +++ b/pom.xml @@ -1190,6 +1190,7 @@ <exclude>README*</exclude> <exclude>**/*.log</exclude> <exclude>**/*.out</exclude> + <exclude>**/*.db</exclude> <exclude>**/eagle.log*</exclude> <exclude>**/velocity.log*</exclude> <!-- all json files should be excluded -->