APEX-42: Added support for configuring unifier attributes through configuration file
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9b78c67b Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9b78c67b Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9b78c67b Branch: refs/heads/feature-module Commit: 9b78c67b68dd42b8216457e886c3cf221b9275b2 Parents: 282c43b Author: Chaitanya <[email protected]> Authored: Mon Sep 21 17:30:00 2015 +0530 Committer: Chaitanya <[email protected]> Committed: Mon Sep 21 18:13:17 2015 +0530 ---------------------------------------------------------------------- .../plan/logical/LogicalPlanConfiguration.java | 37 ++++++++++++++++--- .../logical/LogicalPlanConfigurationTest.java | 39 ++++++++++++++++++++ 2 files changed, 71 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b78c67b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java index 7a53cd7..6b141bc 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java @@ -120,7 +120,7 @@ public class LogicalPlanConfiguration { */ protected enum StramElement { APPLICATION("application"), GATEWAY("gateway"), TEMPLATE("template"), OPERATOR("operator"),STREAM("stream"), PORT("port"), INPUT_PORT("inputport"),OUTPUT_PORT("outputport"), - ATTR("attr"), PROP("prop"),CLASS("class"),PATH("path"); + ATTR("attr"), PROP("prop"),CLASS("class"),PATH("path"),UNIFIER("unifier"); private final String value; /** @@ -168,7 +168,8 @@ public class LogicalPlanConfiguration { GATEWAY(StramElement.GATEWAY, ConfElement.APPLICATION, null, null), OPERATOR(StramElement.OPERATOR, ConfElement.APPLICATION, null, OperatorContext.class), STREAM(StramElement.STREAM, ConfElement.APPLICATION, null, null), - PORT(StramElement.PORT, ConfElement.OPERATOR, EnumSet.of(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT), PortContext.class); + PORT(StramElement.PORT, ConfElement.OPERATOR, EnumSet.of(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT), PortContext.class), + UNIFIER(StramElement.UNIFIER, ConfElement.PORT, null, null); protected static final Map<StramElement, ConfElement> STRAM_ELEMENT_TO_CONF_ELEMENT = Maps.newHashMap(); protected static final Map<Class<? extends Context>, ConfElement> CONTEXT_TO_CONF_ELEMENT = Maps.newHashMap(); @@ -182,6 +183,7 @@ public class LogicalPlanConfiguration { STRAM.setChildren(Sets.newHashSet(APPLICATION, TEMPLATE)); APPLICATION.setChildren(Sets.newHashSet(GATEWAY, OPERATOR, STREAM)); OPERATOR.setChildren(Sets.newHashSet(PORT)); + PORT.setChildren(Sets.newHashSet(UNIFIER)); STRAM_ELEMENT_TO_CONF_ELEMENT.clear(); @@ -1126,7 +1128,7 @@ public class LogicalPlanConfiguration { private final Map<String, String> appAliases = Maps.newHashMap(); private static final StramElement[] CHILD_ELEMENTS = new StramElement[]{StramElement.APPLICATION, StramElement.GATEWAY, StramElement.TEMPLATE, StramElement.OPERATOR, - StramElement.PORT, StramElement.INPUT_PORT, StramElement.OUTPUT_PORT, StramElement.STREAM, StramElement.TEMPLATE, StramElement.ATTR}; + StramElement.PORT, StramElement.INPUT_PORT, StramElement.OUTPUT_PORT, StramElement.STREAM, StramElement.TEMPLATE, StramElement.ATTR, StramElement.UNIFIER}; StramConf() { } @@ -1151,7 +1153,7 @@ public class LogicalPlanConfiguration { private static final StramElement[] CHILD_ELEMENTS = new StramElement[]{StramElement.GATEWAY, StramElement.OPERATOR, StramElement.PORT, StramElement.INPUT_PORT, StramElement.OUTPUT_PORT, StramElement.STREAM, StramElement.ATTR, StramElement.CLASS, StramElement.PATH, - StramElement.PROP}; + StramElement.PROP, StramElement.UNIFIER}; @SuppressWarnings("unused") AppConf() { @@ -1446,7 +1448,7 @@ public class LogicalPlanConfiguration { */ private static class PortConf extends Conf { - private static final StramElement[] CHILD_ELEMENTS = new StramElement[] {StramElement.ATTR}; + private static final StramElement[] CHILD_ELEMENTS = new StramElement[] {StramElement.ATTR, StramElement.UNIFIER}; @SuppressWarnings("unused") PortConf() { @@ -1477,6 +1479,7 @@ public class LogicalPlanConfiguration { elementMaps.put(StramElement.PORT, PortConf.class); elementMaps.put(StramElement.INPUT_PORT, PortConf.class); elementMaps.put(StramElement.OUTPUT_PORT, PortConf.class); + elementMaps.put(StramElement.UNIFIER, OperatorConf.class); } /** @@ -1746,6 +1749,8 @@ public class LogicalPlanConfiguration { parseAppElement(index, keys, element, conf, propertyName, propertyValue); } else if (element == StramElement.GATEWAY) { parseGatewayElement(element, conf, keys, index, propertyName, propertyValue); + } else if ((element == StramElement.UNIFIER)) { + parseUnifierElement(element, conf, keys, index, propertyName, propertyValue); } else if ((element == StramElement.ATTR) || ((element == null) && (conf.getDefaultChildElement() == StramElement.ATTR))) { parseAttributeElement(element, keys, index, conf, propertyValue, propertyName); } else if ((element == StramElement.PROP) || ((element == null) && (conf.getDefaultChildElement() == StramElement.PROP))) { @@ -1798,6 +1803,24 @@ public class LogicalPlanConfiguration { } /** + * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing a unifier element. + * @param element The current {@link StramElement} of the property being parsed. + * @param keys The keys that the property being parsed was split into. + * @param index The current key that the parser is on. + * @param propertyValue The value associated with the property being parsed. + * @param propertyName The complete unprocessed name of the property being parsed. + */ + private void parseUnifierElement(StramElement element, Conf conf1, String[] keys, int index, String propertyName, String propertyValue) + { + Conf elConf = addConf(element, null, conf1); + if (elConf != null) { + parseStramPropertyTokens(keys, index+1, propertyName, propertyValue, elConf); + } else { + LOG.error("Invalid configuration key: {}", propertyName); + } + } + + /** * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing an attribute. * @param element The current {@link StramElement} of the property being parsed. * @param keys The keys that the property being parsed was split into. @@ -2292,6 +2315,10 @@ public class LogicalPlanConfiguration { List<PortConf> portConfs = getMatchingChildConf(opConfs, om.getPortName(), StramElement.PORT); outPortConfs.addAll(portConfs); setAttributes(outPortConfs, om.getAttributes()); + List<OperatorConf> unifConfs = getMatchingChildConf(outPortConfs, null, StramElement.UNIFIER); + if(unifConfs.size() != 0) { + setAttributes(unifConfs, om.getUnifierMeta().getAttributes()); + } } ow.populateAggregatorMeta(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b78c67b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java index 077e3a9..9b2003b 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java @@ -525,6 +525,45 @@ public class LogicalPlanConfigurationTest { } @Test + @SuppressWarnings( {"UnnecessaryBoxing", "AssertEqualsBetweenInconvertibleTypes"}) + public void testUnifierLevelAttributes() { + String appName = "app1"; + final GenericTestOperator operator1 = new GenericTestOperator(); + final GenericTestOperator operator2 = new GenericTestOperator(); + StreamingApplication app = new StreamingApplication() { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + dag.addOperator("operator1", operator1); + dag.addOperator("operator2", operator2); + dag.addStream("s1", operator1.outport1, operator2.inport1); + } + }; + + Properties props = new Properties(); + props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName()); + props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2"); + props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.MEMORY_MB.getName(), "512"); + LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); + dagBuilder.addFromProperties(props, null); + + String appPath = app.getClass().getName().replace(".", "/") + ".class"; + + LogicalPlan dag = new LogicalPlan(); + dagBuilder.prepareDAG(dag, app, appPath); + + OperatorMeta om = null; + for (Map.Entry<OutputPortMeta, StreamMeta> entry : dag.getOperatorMeta("operator1").getOutputStreams().entrySet()) { + if(entry.getKey().getPortName().equals("outport1")) { + om = entry.getKey().getUnifierMeta(); + } + } + Assert.assertNotNull(om); + Assert.assertEquals("", Integer.valueOf(2), om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT)); + Assert.assertEquals("", Integer.valueOf(512), om.getValue(OperatorContext.MEMORY_MB)); + } + + @Test public void testOperatorLevelProperties() { String appName = "app1"; final GenericTestOperator operator1 = new GenericTestOperator();
