Repository: apex-core Updated Branches: refs/heads/master 10650b3a0 -> 3024b06e1
Plugin infrastructure to setup DAG before application launch. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8d46cc6d Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8d46cc6d Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8d46cc6d Branch: refs/heads/master Commit: 8d46cc6d1256d00ad14024dbf12c601835af14f6 Parents: ad4210b Author: Tushar R. Gosavi <[email protected]> Authored: Mon Mar 6 14:45:53 2017 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Tue Mar 14 15:37:55 2017 +0530 ---------------------------------------------------------------------- .../org/apache/apex/api/DAGSetupPlugin.java | 137 +++++++++++++++++++ .../common/util/BaseDAGSetupPlugin.java | 76 ++++++++++ .../datatorrent/stram/client/AppPackage.java | 2 - .../plan/logical/DAGSetupPluginManager.java | 126 +++++++++++++++++ .../plan/logical/LogicalPlanConfiguration.java | 66 ++++++--- .../apex/engine/util/StreamingAppFactory.java | 4 +- .../stram/plan/logical/DAGSetupPluginTests.java | 115 ++++++++++++++++ .../plan/logical/PropertyInjectorVisitor.java | 119 ++++++++++++++++ .../stram/webapp/OperatorDiscoveryTest.java | 17 ++- .../src/test/resources/visitortests.properties | 20 +++ 10 files changed, 656 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java b/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java new file mode 100644 index 0000000..d2e7199 --- /dev/null +++ b/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.api; + +import java.io.Serializable; +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; + +/** + * DAGSetupPlugin allows user provided code to run at various stages + * during DAG preparation. Currently following stages are supported + * + * <ul> + * <li>Before dag is populated</li> + * <li>After dag is populated</li> + * <li>Before dag is configured</li> + * <li>After dag is configured</li> + * <li>Before dag is validated</li> + * <li>After dag is validated</li> + * </ul> + */ [email protected] +public interface DAGSetupPlugin extends Component<DAGSetupPlugin.DAGSetupPluginContext>, Serializable +{ + + /** + * This method is called before platform adds operators and streams in the DAG. i.e this method + * will get called just before {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)} + * + * For Application specified using property and json file format, this method will get called + * before platform adds operators and streams in the DAG as per specification in the file. + */ + void prePopulateDAG(); + + /** + * This method is called after platform adds operators and streams in the DAG. i.e this method + * will get called just after {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)} + * in case application is specified in java. + * + * For Application specified using property and json file format, this method will get called + * after platform has added operators and streams in the DAG as per specification in the file. + */ + void postPopulateDAG(); + + /** + * This is method is called before DAG is configured, i.e operator and application + * properties/attributes are injected from configuration files. + */ + void preConfigureDAG(); + + /** + * This is method is called after DAG is configured, i.e operator and application + * properties/attributes are injected from configuration files. + */ + void postConfigureDAG(); + + /** + * This method is called just before dag is validated before final job submission. + */ + void preValidateDAG(); + + /** + * This method is called after dag is validated. If plugin makes in incompatible changes + * to the DAG at this stage, then application may get launched incorrectly or application + * launch may fail. + */ + void postValidateDAG(); + + public static class DAGSetupPluginContext implements Context + { + private final DAG dag; + private final Configuration conf; + + public DAGSetupPluginContext(DAG dag, Configuration conf) + { + this.dag = dag; + this.conf = conf; + } + + public DAG getDAG() + { + return dag; + } + + public Configuration getConfiguration() + { + return conf; + } + + @Override + public Attribute.AttributeMap getAttributes() + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public <T> T getValue(Attribute<T> key) + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void setCounters(Object counters) + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void sendMetrics(Collection<String> metricNames) + { + throw new UnsupportedOperationException("Not supported yet."); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/common/src/main/java/com/datatorrent/common/util/BaseDAGSetupPlugin.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/BaseDAGSetupPlugin.java b/common/src/main/java/com/datatorrent/common/util/BaseDAGSetupPlugin.java new file mode 100644 index 0000000..7d26b89 --- /dev/null +++ b/common/src/main/java/com/datatorrent/common/util/BaseDAGSetupPlugin.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.common.util; + +import org.apache.apex.api.DAGSetupPlugin; + +/** + * Base class for DAGSetupPlugin implementations that provides empty implementations + * for all interface methods. + */ +public class BaseDAGSetupPlugin implements DAGSetupPlugin +{ + @Override + public void setup(DAGSetupPluginContext context) + { + + } + + @Override + public void prePopulateDAG() + { + + } + + @Override + public void teardown() + { + + } + + @Override + public void postPopulateDAG() + { + + } + + @Override + public void preConfigureDAG() + { + + } + + @Override + public void postConfigureDAG() + { + + } + + @Override + public void preValidateDAG() + { + + } + + @Override + public void postValidateDAG() + { + + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java index addf68e..e6b4b7f 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java +++ b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java @@ -355,7 +355,6 @@ public class AppPackage extends JarFile appInfo.displayName = appFactory.getDisplayName(); try { appInfo.dag = appFactory.createApp(stramAppLauncher.getLogicalPlanConfiguration()); - appInfo.dag.validate(); } catch (Throwable ex) { appInfo.error = ex.getMessage(); appInfo.errorStackTrace = ExceptionUtils.getStackTrace(ex); @@ -394,7 +393,6 @@ public class AppPackage extends JarFile appInfo.displayName = appFactory.getDisplayName(); try { appInfo.dag = appFactory.createApp(stramAppLauncher.getLogicalPlanConfiguration()); - appInfo.dag.validate(); } catch (Throwable t) { appInfo.error = t.getMessage(); appInfo.errorStackTrace = ExceptionUtils.getStackTrace(t); http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java new file mode 100644 index 0000000..ad37071 --- /dev/null +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.plan.logical; + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; + +import org.apache.apex.api.DAGSetupPlugin; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.stram.StramUtils; + +import static org.slf4j.LoggerFactory.getLogger; + +public class DAGSetupPluginManager +{ + private static final Logger LOG = getLogger(DAGSetupPluginManager.class); + + private final transient List<DAGSetupPlugin> plugins = new ArrayList<>(); + private Configuration conf; + + public static final String DAGSETUP_PLUGINS_CONF_KEY = "org.apache.apex.api"; + private DAGSetupPlugin.DAGSetupPluginContext contex; + + private void loadVisitors(Configuration conf) + { + this.conf = conf; + if (!plugins.isEmpty()) { + return; + } + + String classNamesStr = conf.get(DAGSETUP_PLUGINS_CONF_KEY); + if (classNamesStr == null) { + return; + } + String[] classNames = classNamesStr.split(","); + for (String className : classNames) { + try { + Class<? extends DAGSetupPlugin> plugin = StramUtils.classForName(className, DAGSetupPlugin.class); + plugins.add(StramUtils.newInstance(plugin)); + LOG.info("Found DAG setup plugin {}", plugin); + } catch (IllegalArgumentException e) { + LOG.warn("Could not load plugin {}", className); + } + } + } + + public void setup(DAGSetupPlugin.DAGSetupPluginContext context) + { + this.contex = context; + for (DAGSetupPlugin plugin : plugins) { + plugin.setup(context); + } + } + + public enum DispatchType + { + SETUP, + PRE_POPULATE, + POST_POPULATE, + PRE_CONFIGURE, + POST_CONFIGURE, + PRE_VALIDATE, + POST_VALIDATE, + TEARDOWN + } + + public void dispatch(DispatchType type, DAGSetupPlugin.DAGSetupPluginContext context) + { + for (DAGSetupPlugin plugin : plugins) { + switch (type) { + case SETUP: + plugin.setup(context); + break; + case PRE_POPULATE: + plugin.prePopulateDAG(); + break; + case POST_POPULATE: + plugin.postPopulateDAG(); + break; + case PRE_CONFIGURE: + plugin.preConfigureDAG(); + break; + case POST_CONFIGURE: + plugin.postValidateDAG(); + break; + case PRE_VALIDATE: + plugin.preValidateDAG(); + break; + case POST_VALIDATE: + plugin.postValidateDAG(); + break; + case TEARDOWN: + plugin.teardown(); + break; + default: + throw new UnsupportedOperationException("Not implemented "); + } + } + } + + public static synchronized DAGSetupPluginManager getInstance(Configuration conf) + { + DAGSetupPluginManager manager = new DAGSetupPluginManager(); + manager.loadVisitors(conf); + return manager; + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java index bab414f..ffe33f3 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java @@ -48,6 +48,7 @@ import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.api.DAGSetupPlugin.DAGSetupPluginContext; import org.apache.commons.beanutils.BeanMap; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.collections.CollectionUtils; @@ -85,6 +86,15 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta; import com.datatorrent.stram.util.ObjectMapperFactory; +import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_CONFIGURE; +import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_POPULATE; +import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_VALIDATE; +import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_CONFIGURE; +import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_POPULATE; +import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_VALIDATE; +import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.SETUP; +import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.TEARDOWN; + /** * * Builder for the DAG logical representation of operators and streams from properties.<p> @@ -134,6 +144,8 @@ public class LogicalPlanConfiguration LOG.debug("Initialized attributes {}", serial); } + private final DAGSetupPluginManager pluginManager; + /** * This represents an element that can be referenced in a DT property. */ @@ -1640,6 +1652,7 @@ public class LogicalPlanConfiguration { this.conf = conf; this.addFromConfiguration(conf); + this.pluginManager = DAGSetupPluginManager.getInstance(conf); } /** @@ -2052,44 +2065,58 @@ public class LogicalPlanConfiguration return Collections.unmodifiableMap(this.stramConf.appAliases); } - public LogicalPlan createFromProperties(Properties props, String appName) throws IOException + private LogicalPlan populateDAGAndValidate(LogicalPlanConfiguration tb, String appName) { - // build DAG from properties - LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false)); - tb.addFromProperties(props, conf); LogicalPlan dag = new LogicalPlan(); + DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf); + pluginManager.dispatch(SETUP, context); + pluginManager.dispatch(PRE_POPULATE, context); tb.populateDAG(dag); // configure with embedded settings tb.prepareDAG(dag, null, appName); + pluginManager.dispatch(POST_POPULATE, context); // configure with external settings prepareDAG(dag, null, appName); + pluginManager.dispatch(PRE_VALIDATE, context); + dag.validate(); + pluginManager.dispatch(POST_VALIDATE, context); + pluginManager.dispatch(TEARDOWN, context); return dag; } + public LogicalPlan createFromProperties(Properties props, String appName) throws IOException + { + // build DAG from properties + LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false)); + tb.addFromProperties(props, conf); + return populateDAGAndValidate(tb, appName); + } + public LogicalPlan createFromJson(JSONObject json, String appName) throws Exception { // build DAG from properties LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false)); tb.addFromJson(json, conf); - LogicalPlan dag = new LogicalPlan(); - tb.populateDAG(dag); - // configure with embedded settings - tb.prepareDAG(dag, null, appName); - // configure with external settings - prepareDAG(dag, null, appName); - return dag; + return populateDAGAndValidate(tb, appName); } public LogicalPlan createEmptyForRecovery(String appName) { // build DAG from properties LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false)); + return populateDAGAndValidate(tb, appName); + } + + public LogicalPlan createFromStreamingApplication(StreamingApplication app, String appName) + { LogicalPlan dag = new LogicalPlan(); - tb.populateDAG(dag); - // configure with embedded settings - tb.prepareDAG(dag, null, appName); - // configure with external settings - prepareDAG(dag, null, appName); + DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf); + pluginManager.dispatch(SETUP, context); + prepareDAG(dag, app, appName); + pluginManager.dispatch(PRE_VALIDATE, context); + dag.validate(); + pluginManager.dispatch(POST_VALIDATE, context); + pluginManager.dispatch(TEARDOWN, context); return dag; } @@ -2186,7 +2213,6 @@ public class LogicalPlanConfiguration } } } - } private GenericOperator addOperator(LogicalPlan dag, String name, GenericOperator operator) @@ -2222,9 +2248,14 @@ public class LogicalPlanConfiguration // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used String connectAddress = conf.get(StreamingApplication.DT_PREFIX + Context.DAGContext.GATEWAY_CONNECT_ADDRESS.getName()); dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress); + DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf); if (app != null) { + pluginManager.dispatch(SETUP, context); + pluginManager.dispatch(PRE_POPULATE, context); app.populateDAG(dag, conf); + pluginManager.dispatch(POST_POPULATE, context); } + pluginManager.dispatch(PRE_CONFIGURE, context); String appAlias = getAppAlias(name); String appName = appAlias == null ? name : appAlias; List<AppConf> appConfs = stramConf.getMatchingChildConf(appName, StramElement.APPLICATION); @@ -2240,6 +2271,7 @@ public class LogicalPlanConfiguration // inject external operator configuration setOperatorConfiguration(dag, appConfs, appName); setStreamConfiguration(dag, appConfs, appName); + pluginManager.dispatch(POST_CONFIGURE, context); } private void flattenDAG(LogicalPlan dag, Configuration conf) http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java b/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java index 96edc8d..02c0910 100644 --- a/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java +++ b/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java @@ -44,9 +44,7 @@ public abstract class StreamingAppFactory implements StramAppLauncher.AppFactory protected LogicalPlan createApp(StreamingApplication app, LogicalPlanConfiguration planConfig) { - LogicalPlan dag = new LogicalPlan(); - planConfig.prepareDAG(dag, app, getName()); - return dag; + return planConfig.createFromStreamingApplication(app, getName()); } public String getName() http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/test/java/com/datatorrent/stram/plan/logical/DAGSetupPluginTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DAGSetupPluginTests.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DAGSetupPluginTests.java new file mode 100644 index 0000000..25201eb --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DAGSetupPluginTests.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.plan.logical; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.apex.engine.util.StreamingAppFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.stram.StramUtils; +import com.datatorrent.stram.client.StramAppLauncher; +import com.datatorrent.stram.engine.GenericTestOperator; +import com.datatorrent.stram.engine.TestGeneratorInputOperator; + +public class DAGSetupPluginTests +{ + + public static class Application implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TestGeneratorInputOperator inputOperator = dag.addOperator("inputOperator", new TestGeneratorInputOperator()); + GenericTestOperator operator1 = dag.addOperator("operator1", new GenericTestOperator()); + GenericTestOperator operator2 = dag.addOperator("operator2", new GenericTestOperator()); + GenericTestOperator operator3 = dag.addOperator("operator3", new GenericTestOperator()); + GenericTestOperator operator4 = dag.addOperator("operator4", new GenericTestOperator()); + + dag.addStream("n1n2", operator1.outport1, operator2.inport1); + dag.addStream("inputStream", inputOperator.outport, operator1.inport1, operator3.inport1, operator4.inport1); + } + } + + private Configuration getConfiguration() + { + Configuration conf = new Configuration(); + conf.set(DAGSetupPluginManager.DAGSETUP_PLUGINS_CONF_KEY, "com.datatorrent.stram.plan.logical.PropertyInjectorVisitor"); + conf.set("propertyVisitor.Path","/visitortests.properties"); + return conf; + } + + @Test + public void testJavaApplication() + { + Configuration conf = getConfiguration(); + StreamingAppFactory factory = new StreamingAppFactory(Application.class.getName(), Application.class) + { + @Override + public LogicalPlan createApp(LogicalPlanConfiguration planConfig) + { + Class<? extends StreamingApplication> c = StramUtils.classForName(Application.class.getName(), StreamingApplication.class); + StreamingApplication app = StramUtils.newInstance(c); + return super.createApp(app, planConfig); + } + }; + LogicalPlan dag = factory.createApp(new LogicalPlanConfiguration(conf)); + validateProperties(dag); + } + + @Test + public void testPropertyFileApp() throws IOException + { + File tempFile = File.createTempFile("testTopology", "properties"); + org.apache.commons.io.IOUtils.copy(getClass().getResourceAsStream("/testTopology.properties"), new FileOutputStream(tempFile)); + StramAppLauncher.PropertyFileAppFactory factory = new StramAppLauncher.PropertyFileAppFactory(tempFile); + Configuration conf = getConfiguration(); + LogicalPlan dag = factory.createApp(new LogicalPlanConfiguration(conf)); + validateProperties(dag); + tempFile.delete(); + } + + @Test + public void testJsonFileApp() throws IOException + { + File tempFile = File.createTempFile("testTopology", "json"); + org.apache.commons.io.IOUtils.copy(getClass().getResourceAsStream("/testTopology.json"), new FileOutputStream(tempFile)); + StramAppLauncher.JsonFileAppFactory factory = new StramAppLauncher.JsonFileAppFactory(tempFile); + Configuration conf = getConfiguration(); + LogicalPlan dag = factory.createApp(new LogicalPlanConfiguration(conf)); + validateProperties(dag); + tempFile.delete(); + } + + protected void validateProperties(LogicalPlan dag) + { + String[] operators = new String[]{"operator1", "operator2", "operator3", "operator4"}; + for (String name : operators) { + GenericTestOperator op = (GenericTestOperator)dag.getOperatorMeta(name).getOperator(); + Assert.assertEquals("property set on operator ", op.getMyStringProperty(), "mynewstringvalue"); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java new file mode 100644 index 0000000..d2bd927 --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.plan.logical; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import javax.validation.ValidationException; + +import org.slf4j.Logger; + +import org.apache.apex.api.DAGSetupPlugin; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; + +import static org.slf4j.LoggerFactory.getLogger; + +public class PropertyInjectorVisitor implements DAGSetupPlugin +{ + private static final Logger LOG = getLogger(PropertyInjectorVisitor.class); + + private String path; + private Map<String, String> propertyMap = new HashMap<>(); + private DAG dag; + + @Override + public void setup(DAGSetupPluginContext context) + { + this.dag = context.getDAG(); + try { + this.path = context.getConfiguration().get("propertyVisitor.Path"); + Properties properties = new Properties(); + properties.load(this.getClass().getResourceAsStream(path)); + for (Map.Entry<Object, Object> entry : properties.entrySet()) { + propertyMap.put(entry.getKey().toString(), entry.getValue().toString()); + } + } catch (IOException ex) { + throw new ValidationException("Not able to load input file " + path); + } + } + + @Override + public void prePopulateDAG() + { + + } + + @Override + public void postPopulateDAG() + { + + } + + @Override + public void preConfigureDAG() + { + + } + + @Override + public void postConfigureDAG() + { + + } + + @Override + public void preValidateDAG() + { + for (DAG.OperatorMeta ometa : dag.getAllOperatorsMeta()) { + Operator o = ometa.getOperator(); + LogicalPlanConfiguration.setOperatorProperties(o, propertyMap); + } + } + + @Override + public void postValidateDAG() + { + + } + + public PropertyInjectorVisitor() + { + } + + public String getPath() + { + return path; + } + + public void setPath(String path) + { + this.path = path; + } + + @Override + public void teardown() + { + + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java index 6552015..9ac28c0 100644 --- a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java @@ -918,6 +918,15 @@ public class OperatorDiscoveryTest } + public static class InputTestOperator<T, Z extends Map<String, Number>> extends TestOperator<T,Z> implements InputOperator + { + @Override + public void emitTuples() + { + + } + } + static class ExtendedOperator extends TestOperator<String, Map<String, Number>> { } @@ -1047,7 +1056,7 @@ public class OperatorDiscoveryTest @Test public void testLogicalPlanConfiguration() throws Exception { - TestOperator<String, Map<String, Number>> bean = new TestOperator<String, Map<String, Number>>(); + TestOperator<String, Map<String, Number>> bean = new InputTestOperator<String, Map<String, Number>>(); bean.map.put("key1", new Structured()); bean.stringArray = new String[]{"one", "two", "three"}; bean.stringList = Lists.newArrayList("four", "five"); @@ -1074,7 +1083,7 @@ public class OperatorDiscoveryTest jsonPlan.put("streams", new JSONArray()); JSONObject jsonOper = new JSONObject(); jsonOper.put("name", "Test Operator"); - jsonOper.put("class", TestOperator.class.getName()); + jsonOper.put("class", InputTestOperator.class.getName()); jsonOper.put("properties", jsonObj); jsonPlan.put("operators", new JSONArray(Lists.newArrayList(jsonOper))); @@ -1083,9 +1092,9 @@ public class OperatorDiscoveryTest // create logical plan from the json LogicalPlan lp = lpc.createFromJson(jsonPlan, "jsontest"); OperatorMeta om = lp.getOperatorMeta("Test Operator"); - Assert.assertTrue(om.getOperator() instanceof TestOperator); + Assert.assertTrue(om.getOperator() instanceof InputTestOperator); @SuppressWarnings("rawtypes") - TestOperator beanBack = (TestOperator)om.getOperator(); + TestOperator beanBack = (InputTestOperator)om.getOperator(); // The operator deserialized back from json should be same as original operator Assert.assertEquals(bean.map, beanBack.map); http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/test/resources/visitortests.properties ---------------------------------------------------------------------- diff --git a/engine/src/test/resources/visitortests.properties b/engine/src/test/resources/visitortests.properties new file mode 100644 index 0000000..620c53b --- /dev/null +++ b/engine/src/test/resources/visitortests.properties @@ -0,0 +1,20 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +myStringProperty=mynewstringvalue
