Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 7503dde51 -> 454feccac
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java index 3b6bdd1..218156b 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java @@ -15,28 +15,6 @@ */ package com.datatorrent.stram.plan; -import java.io.IOException; -import java.io.InputStream; -import java.io.StringWriter; -import java.lang.reflect.Field; -import java.util.*; - -import javax.validation.ValidationException; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -import org.codehaus.jettison.json.JSONObject; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.mutable.MutableBoolean; -import org.apache.hadoop.conf.Configuration; - import com.datatorrent.api.*; import com.datatorrent.api.Attribute.AttributeMap.AttributeInitializer; import com.datatorrent.api.Context.DAGContext; @@ -46,20 +24,48 @@ import com.datatorrent.api.StringCodec.Integer2String; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.common.codec.JsonStreamCodec; import com.datatorrent.common.util.BasicContainerOptConfigurator; +import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.stram.PartitioningTest.PartitionLoadWatch; import com.datatorrent.stram.client.StramClientUtils; import com.datatorrent.stram.engine.GenericTestOperator; import com.datatorrent.stram.engine.TestGeneratorInputOperator; import com.datatorrent.stram.plan.LogicalPlanTest.ValidationTestOperator; import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta; import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; +import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.AttributeParseUtils; +import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.ConfElement; +import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.ContextUtils; +import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration.StramElement; +import com.datatorrent.stram.plan.logical.MockStorageAgent; import com.datatorrent.stram.support.StramTestSupport.RegexMatcher; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.lang.reflect.Field; +import javax.validation.ValidationException; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.hadoop.conf.Configuration; +import org.codehaus.jettison.json.JSONObject; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; import static org.junit.Assert.*; + + public class LogicalPlanConfigurationTest { private static OperatorMeta assertNode(LogicalPlan dag, String id) { @@ -75,7 +81,6 @@ public class LogicalPlanConfigurationTest { public void testLoadFromConfigXml() { Configuration conf = new Configuration(false); conf.addResource(StramClientUtils.DT_SITE_XML_FILE); - //Configuration.dumpConfiguration(conf, new PrintWriter(System.out)); LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf); @@ -117,7 +122,7 @@ public class LogicalPlanConfigurationTest { assertEquals("operator 2 number of outputs", 1, operator2.getOutputStreams().size()); StreamMeta fromNode2 = operator2.getOutputStreams().values().iterator().next(); - Set<OperatorMeta> targetNodes = new HashSet<OperatorMeta>(); + Set<OperatorMeta> targetNodes = Sets.newHashSet(); for (LogicalPlan.InputPortMeta ip : fromNode2.getSinks()) { targetNodes.add(ip.getOperatorWrapper()); } @@ -191,7 +196,7 @@ public class LogicalPlanConfigurationTest { StreamMeta input1 = dag.getStream("inputStream"); assertNotNull(input1); Assert.assertEquals("input1 source", dag.getOperatorMeta("inputOperator"), input1.getSource().getOperatorMeta()); - Set<OperatorMeta> targetNodes = new HashSet<OperatorMeta>(); + Set<OperatorMeta> targetNodes = Sets.newHashSet(); for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) { targetNodes.add(targetPort.getOperatorWrapper()); } @@ -221,11 +226,11 @@ public class LogicalPlanConfigurationTest { dag.validate(); assertEquals("DAG attribute CONTAINER_JVM_OPTIONS ", dag.getAttributes().get(DAGContext.CONTAINER_JVM_OPTIONS), "-Xmx16m"); - Map<Class<?>, Class<? extends StringCodec<?>>> stringCodecsMap = new HashMap<Class<?>, Class<? extends StringCodec<?>>>(); + Map<Class<?>, Class<? extends StringCodec<?>>> stringCodecsMap = Maps.newHashMap(); stringCodecsMap.put(Integer.class, Integer2String.class); assertEquals("DAG attribute STRING_CODECS ", stringCodecsMap, dag.getAttributes().get(DAGContext.STRING_CODECS)); assertEquals("DAG attribute CONTAINER_OPTS_CONFIGURATOR ", BasicContainerOptConfigurator.class, dag.getAttributes().get(DAGContext.CONTAINER_OPTS_CONFIGURATOR).getClass()); - + assertEquals("number of operator confs", 5, dag.getAllOperators().size()); assertEquals("number of root operators", 1, dag.getRootOperators().size()); @@ -241,7 +246,7 @@ public class LogicalPlanConfigurationTest { for(OutputPortMeta opm : input.getOutputStreams().keySet()){ assertTrue("output port of input Operator attribute is JsonStreamCodec ", opm.getAttributes().get(PortContext.STREAM_CODEC) instanceof JsonStreamCodec<?>); } - + OperatorMeta operator3 = dag.getOperatorMeta("operator3"); assertEquals("operator3.classname", GenericTestOperator.class, operator3.getOperator().getClass()); @@ -259,7 +264,7 @@ public class LogicalPlanConfigurationTest { assertNotNull(input1); OperatorMeta inputOperator = dag.getOperatorMeta("inputOperator"); Assert.assertEquals("input1 source", inputOperator, input1.getSource().getOperatorMeta()); - Set<OperatorMeta> targetNodes = new HashSet<OperatorMeta>(); + Set<OperatorMeta> targetNodes = Sets.newHashSet(); for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) { targetNodes.add(targetPort.getOperatorWrapper()); } @@ -269,6 +274,7 @@ public class LogicalPlanConfigurationTest { } @Test + @SuppressWarnings("UnnecessaryBoxing") public void testAppLevelAttributes() { String appName = "app1"; @@ -296,6 +302,7 @@ public class LogicalPlanConfigurationTest { } @Test + @SuppressWarnings("UnnecessaryBoxing") public void testAppLevelProperties() { String appName ="app1"; Properties props =new Properties(); @@ -315,6 +322,7 @@ public class LogicalPlanConfigurationTest { Assert.assertEquals("",Integer.valueOf(1000),app1Test.getTestprop3()); Assert.assertEquals("",Integer.valueOf(10000),app1Test.getInncls().getA()); } + @Test public void testPrepareDAG() { final MutableBoolean appInitialized = new MutableBoolean(false); @@ -365,6 +373,7 @@ public class LogicalPlanConfigurationTest { Operator operator3 = dag.addOperator("operator3", new GenericTestOperator()); LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false)); + LOG.debug("calling addFromProperties"); pb.addFromProperties(props, null); Map<String, String> configProps = pb.getProperties(dag.getMeta(operator1), "appName"); @@ -415,7 +424,6 @@ public class LogicalPlanConfigurationTest { @Override public void populateDAG(DAG dag, Configuration conf) { - //dag.setAttribute(DAGContext.APPLICATION_NAME, "testApp"); } } @@ -478,6 +486,7 @@ public class LogicalPlanConfigurationTest { } @Test + @SuppressWarnings( {"UnnecessaryBoxing", "AssertEqualsBetweenInconvertibleTypes"}) public void testOperatorLevelAttributes() { String appName = "app1"; StreamingApplication app = new StreamingApplication() { @@ -581,22 +590,10 @@ public class LogicalPlanConfigurationTest { } @Test + @SuppressWarnings("UnnecessaryBoxing") public void testPortLevelAttributes() { String appName = "app1"; - final GenericTestOperator gt1 = new GenericTestOperator(); - final GenericTestOperator gt2 = new GenericTestOperator(); - final GenericTestOperator gt3 = new GenericTestOperator(); - StreamingApplication app = new StreamingApplication() { - @Override - public void populateDAG(DAG dag, Configuration conf) - { - dag.addOperator("operator1", gt1); - dag.addOperator("operator2", gt2); - dag.addOperator("operator3", gt3); - dag.addStream("s1", gt1.outport1, gt2.inport1); - dag.addStream("s2", gt2.outport1, gt3.inport1, gt3.inport2); - } - }; + SimpleTestApplication app = new SimpleTestApplication(); Properties props = new Properties(); props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName()); @@ -613,33 +610,31 @@ public class LogicalPlanConfigurationTest { LogicalPlan dag = new LogicalPlan(); dagBuilder.prepareDAG(dag, app, appPath); - //dagBuilder.populateDAG(dag, new Configuration(false)); - //dagBuilder.setApplicationConfiguration(dag, appName); OperatorMeta om1 = dag.getOperatorMeta("operator1"); - Assert.assertEquals("", Integer.valueOf(16 * 1024), om1.getMeta(gt1.outport1).getValue(PortContext.QUEUE_CAPACITY)); + Assert.assertEquals("", Integer.valueOf(16 * 1024), om1.getMeta(app.gt1.outport1).getValue(PortContext.QUEUE_CAPACITY)); OperatorMeta om2 = dag.getOperatorMeta("operator2"); - Assert.assertEquals("", Integer.valueOf(32 * 1024), om2.getMeta(gt2.inport1).getValue(PortContext.QUEUE_CAPACITY)); - Assert.assertEquals("", Integer.valueOf(32 * 1024), om2.getMeta(gt2.outport1).getValue(PortContext.QUEUE_CAPACITY)); + Assert.assertEquals("", Integer.valueOf(32 * 1024), om2.getMeta(app.gt2.inport1).getValue(PortContext.QUEUE_CAPACITY)); + Assert.assertEquals("", Integer.valueOf(32 * 1024), om2.getMeta(app.gt2.outport1).getValue(PortContext.QUEUE_CAPACITY)); OperatorMeta om3 = dag.getOperatorMeta("operator3"); - Assert.assertEquals("", Integer.valueOf(16 * 1024), om3.getMeta(gt3.inport1).getValue(PortContext.QUEUE_CAPACITY)); - Assert.assertEquals("", Integer.valueOf(32 * 1024), om3.getMeta(gt3.inport2).getValue(PortContext.QUEUE_CAPACITY)); + Assert.assertEquals("", Integer.valueOf(16 * 1024), om3.getMeta(app.gt3.inport1).getValue(PortContext.QUEUE_CAPACITY)); + Assert.assertEquals("", Integer.valueOf(32 * 1024), om3.getMeta(app.gt3.inport2).getValue(PortContext.QUEUE_CAPACITY)); } @Test public void testInvalidAttribute() throws Exception { - Assert.assertNotSame(0, com.datatorrent.api.Context.DAGContext.serialVersionUID); - Set<Attribute<Object>> appAttributes = AttributeInitializer.getAttributes(com.datatorrent.api.Context.DAGContext.class); - Attribute<Object> attribute = new Attribute<Object>("", null); + Attribute<String> attribute = new Attribute<>("", null); Field nameField = Attribute.class.getDeclaredField("name"); nameField.setAccessible(true); nameField.set(attribute, "NOT_CONFIGURABLE"); nameField.setAccessible(false); - appAttributes.add(attribute); + ContextUtils.addAttribute(com.datatorrent.api.Context.DAGContext.class, attribute); + AttributeParseUtils.initialize(); + ConfElement.initialize(); // attribute that cannot be configured @@ -656,7 +651,9 @@ public class LogicalPlanConfigurationTest { Assert.assertThat("Attribute not configurable", e.getMessage(), RegexMatcher.matches("Attribute does not support property configuration: NOT_CONFIGURABLE.*")); } - appAttributes.remove(attribute); + ContextUtils.removeAttribute(com.datatorrent.api.Context.DAGContext.class, attribute); + AttributeParseUtils.initialize(); + ConfElement.initialize(); // invalid attribute name props = new Properties(); @@ -667,9 +664,9 @@ public class LogicalPlanConfigurationTest { new LogicalPlanConfiguration(new Configuration(false)).addFromProperties(props, null); Assert.fail("Exception expected"); } catch (Exception e) { + LOG.debug("Exception message: {}", e.getMessage()); Assert.assertThat("Invalid attribute name", e.getMessage(), RegexMatcher.matches("Invalid attribute reference: " + invalidAttribute)); } - } @Test @@ -759,6 +756,898 @@ public class LogicalPlanConfigurationTest { dag.validate(); } + /** + * This test and all of the following ambiguous attribute tests verify that when an ambiguous attribute + * name is provided, all the corresponding attributes are set. + * <br/><br/> + * <b>Note:</b> Ambiguous attribute means that when multiple attributes with the same + * simple name exist for multiple types of dag elements (like operators and ports). + * An example of such attributes are the com.datatorrent.api.Context.OperatorContext.AUTO_RECORD + * and com.datatorrent.api.Context.PortContext.AUTO_RECORD. + * <br/><br/> + * This test should set the attribute on the operators and ports. + */ + /** + * This test should set the attribute on the operators and ports. + */ + @Test + public void testRootLevelAmbiguousAttributeSimple() + { + testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, + Context.PortContext.AUTO_RECORD, + StreamingApplication.DT_PREFIX, + null, + Boolean.TRUE, + true, + true); + } + + /** + * This test should set the attribute on the operators and ports. + */ + @Test + public void testApplicationLevelAmbiguousAttributeSimple() + { + testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, + Context.PortContext.AUTO_RECORD, + StreamingApplication.DT_PREFIX + + "application" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR, + null, + Boolean.TRUE, + true, + true); + } + + /** + * This should only set the attribute on the operator + */ + @Test + public void testOperatorLevelAmbiguousAttributeSimple() + { + testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, + Context.PortContext.AUTO_RECORD, + StreamingApplication.DT_PREFIX + + "operator" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR, + null, + Boolean.TRUE, + true, + false); + } + + /** + * This should only set the attribute on the port + */ + @Test + public void testPortLevelAmbiguousAttributeSimple() + { + testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, + Context.PortContext.AUTO_RECORD, + StreamingApplication.DT_PREFIX + + "port" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR, + null, + Boolean.TRUE, + false, + true); + } + + /** + * This test should set the attribute on the operators and ports. + */ + @Test + public void testRootLevelAmbiguousAttributeComplex() + { + testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, + Context.PortContext.AUTO_RECORD, + StreamingApplication.DT_PREFIX, + PortContext.class.getCanonicalName(), + Boolean.TRUE, + false, + true); + } + + /** + * This test should set the attribute on the operators and ports. + */ + @Test + public void testApplicationLevelAmbiguousAttributeComplex() + { + testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, + Context.PortContext.AUTO_RECORD, + StreamingApplication.DT_PREFIX + + "application" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR, + PortContext.class.getCanonicalName(), + Boolean.TRUE, + false, + true); + } + + /** + * This should only set the attribute on the operator + */ + @Test + public void testOperatorLevelAmbiguousAttributeComplex() + { + testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, + Context.PortContext.AUTO_RECORD, + StreamingApplication.DT_PREFIX + + "operator" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR, + OperatorContext.class.getCanonicalName(), + Boolean.TRUE, + true, + false); + } + + /** + * This should only set the attribute on the port + */ + @Test + public void testOperatorLevelAmbiguousAttributeComplex2() + { + testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, + Context.PortContext.AUTO_RECORD, + StreamingApplication.DT_PREFIX + + "operator" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR, + PortContext.class.getCanonicalName(), + Boolean.TRUE, + false, + true); + } + + /** + * This should only set the attribute on the port + */ + @Test + public void testPortLevelAmbiguousAttributeComplex() + { + testAttributeAmbiguousSimpleHelper(Context.OperatorContext.AUTO_RECORD, + Context.PortContext.AUTO_RECORD, + StreamingApplication.DT_PREFIX + + "port" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR, + PortContext.class.getCanonicalName(), + Boolean.TRUE, + false, + true); + } + + private void testAttributeAmbiguousSimpleHelper(Attribute<?> attributeObjOperator, + Attribute<?> attributeObjPort, + String root, + String contextClass, + Object val, + boolean operatorSet, + boolean portSet) + { + Properties props = propertiesBuilder(attributeObjOperator.getSimpleName(), + root, + contextClass, + val); + + simpleAttributeOperatorHelperAssert(attributeObjOperator, + props, + val, + operatorSet); + + simpleNamePortAssertHelperAssert(attributeObjPort, + props, + val, + portSet); + } + + @Test + public void testRootLevelAttributeSimpleNameOperator() + { + simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, + StreamingApplication.DT_PREFIX, + true, + (Integer)4096, + true, + true); + } + + @Test + public void testRootLevelStorageAgentSimpleNameOperator() + { + MockStorageAgent mockAgent = new MockStorageAgent(); + + simpleAttributeOperatorHelper(OperatorContext.STORAGE_AGENT, + StreamingApplication.DT_PREFIX, + true, + mockAgent, + true, + false); + } + + @Test + public void testRootLevelAttributeSimpleNameOperatorNoScope() + { + simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, + StreamingApplication.DT_PREFIX, + true, + (Integer)4096, + true, + false); + } + + @Test + public void testApplicationLevelAttributeSimpleNameOperator() + { + simpleAttributeOperatorHelper(OperatorContext.MEMORY_MB, + StreamingApplication.DT_PREFIX + + "application" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "SimpleTestApp" + + LogicalPlanConfiguration.KEY_SEPARATOR, + true, + (Integer)4096, + true, + true); + } + + private void simpleAttributeOperatorHelper(Attribute<?> attributeObj, + String root, + boolean simpleName, + Object val, + boolean set, + boolean scope) + { + Properties props = propertiesBuilderOperator(attributeObj.getSimpleName(), + root, + simpleName, + val, + scope); + + simpleAttributeOperatorHelperAssert(attributeObj, + props, + val, + set); + } + + private void simpleAttributeOperatorHelperAssert(Attribute<?> attributeObj, + Properties props, + Object val, + boolean set) + { + SimpleTestApplicationWithName app = new SimpleTestApplicationWithName(); + + LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); + dagBuilder.addFromProperties(props, null); + + String appPath = app.getClass().getName().replace(".", "/") + ".class"; + + LogicalPlan dag = new LogicalPlan(); + dagBuilder.prepareDAG(dag, app, appPath); + + OperatorMeta om1 = dag.getOperatorMeta("operator1"); + + if (set) { + Assert.assertEquals(val, om1.getValue(attributeObj)); + } else { + Assert.assertNotEquals(val, om1.getValue(attributeObj)); + } + + OperatorMeta om2 = dag.getOperatorMeta("operator2"); + + if (set) { + Assert.assertEquals(val, om2.getValue(attributeObj)); + } else { + Assert.assertNotEquals(val, om2.getValue(attributeObj)); + } + + OperatorMeta om3 = dag.getOperatorMeta("operator3"); + + if (set) { + Assert.assertEquals(val, om3.getValue(attributeObj)); + } else { + Assert.assertNotEquals(val, om3.getValue(attributeObj)); + } + } + + /* Port tests */ + @Test + public void testRootLevelAttributeSimpleNamePort() + { + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX, + true, + (Integer)4096, + true, + true); + } + + @Test + public void testRootLevelAttributeSimpleNamePortNoScope() + { + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX, + true, + (Integer)4096, + true, + false); + } + + @Test + public void testOperatorLevelAttributeSimpleNamePort() + { + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX + + "operator" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR, + true, + (Integer)4096, + true, + true); + } + + @Test + public void testApplicationLevelAttributeSimpleNamePort() + { + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX + + "application" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "SimpleTestApp" + + LogicalPlanConfiguration.KEY_SEPARATOR, + true, + (Integer)4096, + true, + true); + } + + @Test + public void testRootLevelAttributeComplexNamePort() + { + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX, + false, + (Integer)4096, + true, + true); + } + + @Test + public void testRootLevelAttributeComplexNamePortNoScope() + { + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX, + false, + (Integer)4096, + true, + false); + } + + @Test + public void testOperatorLevelAttributeComplexNamePort() + { + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX + + "operator" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR, + false, + (Integer)4096, + true, + true); + } + + @Test + public void testApplicationLevelAttributeComplexNamePort() + { + simpleAttributePortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX + + "application" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "SimpleTestApp" + + LogicalPlanConfiguration.KEY_SEPARATOR, + false, + (Integer)4096, + true, + true); + } + + /* Input port tests */ + @Test + public void testRootLevelAttributeSimpleNameInputPort() + { + simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX, + true, + (Integer)4096, + true); + } + + @Test + public void testOperatorLevelAttributeSimpleNameInputPort() + { + simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX + + "operator" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR, + true, + (Integer)4096, + true); + } + + @Test + public void testApplicationLevelAttributeSimpleNameInputPort() + { + simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX + + "application" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "SimpleTestApp" + + LogicalPlanConfiguration.KEY_SEPARATOR, + true, + (Integer)4096, + true); + } + + @Test + public void testRootLevelAttributeComplexNameInputPort() + { + simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX, + false, + (Integer)4096, + true); + } + + @Test + public void testOperatorLevelAttributeComplexNameInputPort() + { + simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX + + "operator" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR, + false, + (Integer)4096, + true); + } + + @Test + public void testApplicationLevelAttributeComplexNameInputPort() + { + simpleAttributeInputPortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX + + "application" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "SimpleTestApp" + + LogicalPlanConfiguration.KEY_SEPARATOR, + false, + (Integer)4096, + true); + } + + /* Output port tests */ + @Test + public void testRootLevelAttributeSimpleNameOutputPort() + { + simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX, + true, + (Integer)4096, + true); + } + + @Test + public void testOperatorLevelAttributeSimpleNameOutputPort() + { + simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX + + "operator" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR, + true, + (Integer)4096, + true); + } + + @Test + public void testApplicationLevelAttributeSimpleNameOutputPort() + { + simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX + + "application" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "SimpleTestApp" + + LogicalPlanConfiguration.KEY_SEPARATOR, + true, + (Integer)4096, + true); + } + + @Test + public void testRootLevelAttributeComplexNameOutputPort() + { + simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX, + false, + (Integer)4096, + true); + } + + @Test + public void testOperatorLevelAttributeComplexNameOutputPort() + { + simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX + + "operator" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR, + false, + (Integer)4096, + true); + } + + @Test + public void testApplicationLevelAttributeComplexNameOutputPort() + { + simpleAttributeOutputPortHelper(PortContext.QUEUE_CAPACITY, + StreamingApplication.DT_PREFIX + + "application" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "SimpleTestApp" + + LogicalPlanConfiguration.KEY_SEPARATOR, + false, + (Integer)4096, + true); + } + + /* Helpers for building ports */ + private void simpleAttributePortHelper(Attribute<?> attributeObj, + String root, + boolean simpleName, + Object val, + boolean set, + boolean scope) + { + Properties props = propertiesBuilderPort(attributeObj.getSimpleName(), + root, + simpleName, + val, + scope); + + simpleNamePortAssertHelperAssert(attributeObj, + props, + val, + set); + } + + private void simpleAttributeInputPortHelper(Attribute<?> attributeObj, + String root, + boolean simpleName, + Object val, + boolean set) + { + Properties props = propertiesBuilderInputPort(attributeObj.getSimpleName(), + root, + simpleName, + val); + + simpleNameInputPortAssertHelperAssert(attributeObj, + props, + val, + set); + + simpleNameOutputPortAssertHelperAssert(attributeObj, + props, + val, + !set); + } + + private void simpleAttributeOutputPortHelper(Attribute<?> attributeObj, + String root, + boolean simpleName, + Object val, + boolean set) + { + Properties props = propertiesBuilderOutputPort(attributeObj.getSimpleName(), + root, + simpleName, + val); + + simpleNameOutputPortAssertHelperAssert(attributeObj, + props, + val, + set); + + simpleNameInputPortAssertHelperAssert(attributeObj, + props, + val, + !set); + } + + private void simpleNamePortAssertHelperAssert(Attribute<?> attributeObj, + Properties props, + Object val, + boolean set) + { + SimpleTestApplicationWithName app = new SimpleTestApplicationWithName(); + + LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); + dagBuilder.addFromProperties(props, null); + + String appPath = app.getClass().getName().replace(".", "/") + ".class"; + + LogicalPlan dag = new LogicalPlan(); + dagBuilder.prepareDAG(dag, app, appPath); + + simpleNamePortAssertHelper(attributeObj, + dag, + "operator1", + val, + set); + + simpleNamePortAssertHelper(attributeObj, + dag, + "operator2", + val, + set); + + simpleNamePortAssertHelper(attributeObj, + dag, + "operator3", + val, + set); + } + + private void simpleNameInputPortAssertHelperAssert(Attribute<?> attributeObj, + Properties props, + Object val, + boolean set) + { + SimpleTestApplicationWithName app = new SimpleTestApplicationWithName(); + + LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); + dagBuilder.addFromProperties(props, null); + + String appPath = app.getClass().getName().replace(".", "/") + ".class"; + + LogicalPlan dag = new LogicalPlan(); + dagBuilder.prepareDAG(dag, app, appPath); + + simpleNameInputPortAssertHelper(attributeObj, + dag, + "operator1", + val, + set); + + simpleNameInputPortAssertHelper(attributeObj, + dag, + "operator2", + val, + set); + + simpleNameInputPortAssertHelper(attributeObj, + dag, + "operator3", + val, + set); + } + + private void simpleNameOutputPortAssertHelperAssert(Attribute<?> attributeObj, + Properties props, + Object val, + boolean set) + { + SimpleTestApplicationWithName app = new SimpleTestApplicationWithName(); + + LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false)); + dagBuilder.addFromProperties(props, null); + + String appPath = app.getClass().getName().replace(".", "/") + ".class"; + + LogicalPlan dag = new LogicalPlan(); + dagBuilder.prepareDAG(dag, app, appPath); + + simpleNameOutputPortAssertHelper(attributeObj, + dag, + "operator1", + val, + set); + + simpleNameOutputPortAssertHelper(attributeObj, + dag, + "operator2", + val, + set); + + simpleNameOutputPortAssertHelper(attributeObj, + dag, + "operator3", + val, + set); + } + + private void simpleNamePortAssertHelper(Attribute<?> attributeObj, + LogicalPlan dag, + String operatorName, + Object queueCapacity, + boolean set) + { + simpleNameInputPortAssertHelper(attributeObj, + dag, + operatorName, + queueCapacity, + set); + + simpleNameOutputPortAssertHelper(attributeObj, + dag, + operatorName, + queueCapacity, + set); + } + + private void simpleNameInputPortAssertHelper(Attribute<?> attributeObj, + LogicalPlan dag, + String operatorName, + Object queueCapacity, + boolean set) + { + OperatorMeta operatorMeta = dag.getOperatorMeta(operatorName); + + for (InputPortMeta inputPortMeta: operatorMeta.getInputStreams().keySet()) { + if (set) { + Assert.assertEquals(queueCapacity, inputPortMeta.getValue(attributeObj)); + } else { + Assert.assertNotEquals(queueCapacity, inputPortMeta.getValue(attributeObj)); + } + } + } + + private void simpleNameOutputPortAssertHelper(Attribute<?> attributeObj, + LogicalPlan dag, + String operatorName, + Object queueCapacity, + boolean set) + { + OperatorMeta operatorMeta = dag.getOperatorMeta(operatorName); + + for (OutputPortMeta outputPortMeta: operatorMeta.getOutputStreams().keySet()) { + if (set) { + Assert.assertEquals(queueCapacity, outputPortMeta.getValue(attributeObj)); + } else { + Assert.assertNotEquals(queueCapacity, outputPortMeta.getValue(attributeObj)); + } + } + } + + /* Helpers for building properties */ + private Properties propertiesBuilder(String attributeName, + String root, + String contextClass, + Object val) + { + boolean simpleName = contextClass == null; + + if (!simpleName) { + attributeName = contextClass + + LogicalPlanConfiguration.KEY_SEPARATOR + + attributeName; + } + + Properties props = new Properties(); + + String propName = root + + StramElement.ATTR.getValue() + + LogicalPlanConfiguration.KEY_SEPARATOR + + attributeName; + + LOG.debug("adding prop {} with value {}", propName, val.toString()); + + props.put(propName, + val.toString()); + + return props; + } + + private Properties propertiesBuilderOperator(String attributeName, + String root, + boolean simpleName, + Object val, + boolean addOperator) + { + String contextClass = simpleName ? null : OperatorContext.class.getCanonicalName(); + + if (addOperator) { + root += "operator" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR; + } + + return propertiesBuilder(attributeName, + root, + contextClass, + val); + } + + private Properties propertiesBuilderPort(String attributeName, + String root, + boolean simpleName, + Object val, + boolean addPort) + { + String contextClass = simpleName ? null : PortContext.class.getCanonicalName(); + + if (addPort) { + root += "port" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR; + } + + return propertiesBuilder(attributeName, + root, + contextClass, + val); + } + + private Properties propertiesBuilderInputPort(String attributeName, + String root, + boolean simpleName, + Object val) + { + String contextClass = simpleName ? null: PortContext.class.getCanonicalName(); + + root += "inputport" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR; + + return propertiesBuilder(attributeName, + root, + contextClass, + val); + } + + private Properties propertiesBuilderOutputPort(String attributeName, + String root, + boolean simpleName, + Object val) + { + String contextClass = simpleName ? null: PortContext.class.getCanonicalName(); + + root += "outputport" + + LogicalPlanConfiguration.KEY_SEPARATOR + + "*" + + LogicalPlanConfiguration.KEY_SEPARATOR; + + return propertiesBuilder(attributeName, + root, + contextClass, + val); + } + private static final Logger logger = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class); public static class TestApplication implements StreamingApplication { @@ -818,11 +1707,11 @@ public class LogicalPlanConfigurationTest { } } } - + public static class TestStatsListener implements StatsListener{ - + private int intProp; - + public TestStatsListener() { } @@ -866,11 +1755,34 @@ public class LogicalPlanConfigurationTest { return false; return true; } - } public static class TestSchema { } + + public static class SimpleTestApplication implements StreamingApplication + { + public final GenericTestOperator gt1 = new GenericTestOperator(); + public final GenericTestOperator gt2 = new GenericTestOperator(); + public final GenericTestOperator gt3 = new GenericTestOperator(); + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + dag.addOperator("operator1", gt1); + dag.addOperator("operator2", gt2); + dag.addOperator("operator3", gt3); + dag.addStream("s1", gt1.outport1, gt2.inport1); + dag.addStream("s2", gt2.outport1, gt3.inport1, gt3.inport2); + } + }; + + @ApplicationAnnotation(name="SimpleTestApp") + public static class SimpleTestApplicationWithName extends SimpleTestApplication + { + }; + + private static final Logger LOG = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java index ac05bed..78173d8 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java @@ -19,6 +19,7 @@ import com.datatorrent.common.util.BaseOperator; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.Serializable; + import java.util.*; import javax.validation.*; @@ -50,6 +51,7 @@ import com.datatorrent.stram.engine.TestGeneratorInputOperator; import com.datatorrent.stram.engine.TestNonOptionalOutportInputOperator; import com.datatorrent.stram.engine.TestOutputOperator; import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta; import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java new file mode 100644 index 0000000..3975e02 --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/MockStorageAgent.java @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datatorrent.stram.plan.logical; + +import com.datatorrent.api.StorageAgent; +import java.io.IOException; + +public class MockStorageAgent implements StorageAgent +{ + public MockStorageAgent() + { + } + + @Override + public void save(Object object, int operatorId, long windowId) throws IOException + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public Object load(int operatorId, long windowId) throws IOException + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void delete(int operatorId, long windowId) throws IOException + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public long[] getWindowIds(int operatorId) throws IOException + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public String toString() + { + return MockStorageAgent.class.getCanonicalName(); + } + + @Override + public boolean equals(Object obj) + { + if(obj == null) { + return false; + } + + return obj instanceof MockStorageAgent; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/434a7170/engine/src/test/resources/testTopology.json ---------------------------------------------------------------------- diff --git a/engine/src/test/resources/testTopology.json b/engine/src/test/resources/testTopology.json index 1ea9756..62c5262 100644 --- a/engine/src/test/resources/testTopology.json +++ b/engine/src/test/resources/testTopology.json @@ -13,7 +13,7 @@ "STATS_LISTENERS" : { "java.util.ArrayList" : [ { - "com.datatorrent.stram.plan.LogicalPlanConfigurationTest$TestStatsListener" : { + "com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest$TestStatsListener" : { "intProp" : 222 } } @@ -38,7 +38,7 @@ "com.datatorrent.stram.engine.GenericTestOperator":{ "myStringProperty": "myStringPropertyValue" } - + } }, {
