APEX-157 #comment #resolve Added changes for attribute serializable check in dag.validate
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b799bd2d Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b799bd2d Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b799bd2d Branch: refs/heads/feature-module Commit: b799bd2d3edd7af3bdf4631129601e68ce349bf4 Parents: 4adac06 Author: ishark <[email protected]> Authored: Thu Sep 24 17:05:13 2015 -0700 Committer: ishark <[email protected]> Committed: Mon Sep 28 16:43:08 2015 -0700 ---------------------------------------------------------------------- .../stram/plan/logical/LogicalPlan.java | 23 ++++++ .../logical/LogicalPlanConfigurationTest.java | 6 ++ .../stram/plan/logical/LogicalPlanTest.java | 83 +++++++++++++++++++- engine/src/test/resources/testTopology.json | 2 +- 4 files changed, 112 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b799bd2d/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index f068884..6405644 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; import com.datatorrent.api.*; +import com.datatorrent.api.Attribute.AttributeMap; import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap; import com.datatorrent.api.Operator.InputPort; import com.datatorrent.api.Operator.OutputPort; @@ -1312,6 +1313,8 @@ public class LogicalPlan implements Serializable, DAG Validation.buildDefaultValidatorFactory(); Validator validator = factory.getValidator(); + checkAttributeValueSerializable(this.getAttributes(), DAG.class.getName()); + // clear oioRoot values in all operators for (OperatorMeta n: operators.values()) { n.oioRoot = null; @@ -1336,6 +1339,8 @@ public class LogicalPlan implements Serializable, DAG OperatorMeta.PortMapping portMapping = n.getPortMapping(); + checkAttributeValueSerializable(n.getAttributes(), n.getName()); + // Check operator annotation if (n.operatorAnnotation != null) { // Check if partition property of the operator is being honored @@ -1368,6 +1373,7 @@ public class LogicalPlan implements Serializable, DAG // check that non-optional ports are connected for (InputPortMeta pm: portMapping.inPortMap.values()) { + checkAttributeValueSerializable(pm.getAttributes(), n.getName() + "." + pm.getPortName()); StreamMeta sm = n.inputStreams.get(pm); if (sm == null) { if ((pm.portAnnotation == null || !pm.portAnnotation.optional()) && pm.classDeclaringHiddenPort == null) { @@ -1397,6 +1403,7 @@ public class LogicalPlan implements Serializable, DAG boolean allPortsOptional = true; for (OutputPortMeta pm: portMapping.outPortMap.values()) { + checkAttributeValueSerializable(pm.getAttributes(), n.getName() + "." + pm.getPortName()); if (!n.outputStreams.containsKey(pm)) { if ((pm.portAnnotation != null && !pm.portAnnotation.optional()) && pm.classDeclaringHiddenPort == null) { throw new ValidationException("Output port connection required: " + n.name + "." + pm.getPortName()); @@ -1458,6 +1465,22 @@ public class LogicalPlan implements Serializable, DAG } + private void checkAttributeValueSerializable(AttributeMap attributes, String context) + { + StringBuilder sb = new StringBuilder(); + String delim = ""; + // Check all attributes got operator are serializable + for (Entry<Attribute<?>, Object> entry : attributes.entrySet()) { + if (entry.getValue() != null && !(entry.getValue() instanceof Serializable)) { + sb.append(delim).append(entry.getKey().getSimpleName()); + delim = ", "; + } + } + if (sb.length() > 0) { + throw new ValidationException("Attribute value(s) for " + sb.toString() + " in " + context + " are not serializable"); + } + } + /* * Validates OIO constraints for operators with more than one input streams * For a node to be OIO, http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b799bd2d/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java index c4ad724..1d95afe 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java @@ -20,6 +20,7 @@ package com.datatorrent.stram.plan.logical; import java.io.IOException; import java.io.InputStream; +import java.io.Serializable; import java.io.StringWriter; import java.lang.reflect.Field; @@ -84,6 +85,11 @@ public class LogicalPlanConfigurationTest { return n; } + public static class TestStreamCodec<T> extends JsonStreamCodec<T> implements Serializable + { + private static final long serialVersionUID = 1L; + } + /** * Test read from dt-site.xml in Hadoop configuration format. */ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b799bd2d/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java index 52c5f7d..a4ac488 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java @@ -19,9 +19,11 @@ package com.datatorrent.stram.plan.logical; import com.datatorrent.common.util.BaseOperator; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.Serializable; +import java.lang.reflect.Field; import java.util.*; import javax.validation.*; @@ -41,6 +43,7 @@ import static org.junit.Assert.*; import com.datatorrent.common.partitioner.StatelessPartitioner; import com.datatorrent.api.*; +import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG.Locality; @@ -683,7 +686,85 @@ public class LogicalPlanTest { Assert.assertNotNull("port object null", o1Clone.inport1); } - private static class TestStreamCodec implements StreamCodec<Object> { + @Test + public void testAttributeValuesSerializableCheck() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException + { + LogicalPlan dag = new LogicalPlan(); + Attribute<Object> attr = new Attribute<Object>(new TestAttributeValue(), new Object2String()); + Field nameField = Attribute.class.getDeclaredField("name"); + nameField.setAccessible(true); + nameField.set(attr, "Test_Attribute"); + nameField.setAccessible(false); + + assertNotNull(attr); + // Dag attribute not serializable test + dag.setAttribute(attr, new TestAttributeValue()); + try { + dag.validate(); + Assert.fail("Setting not serializable attribute should throw exception"); + } catch (ValidationException e) { + assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in com.datatorrent.api.DAG are not serializable", e.getMessage()); + } + + // Operator attribute not serializable test + dag = new LogicalPlan(); + TestGeneratorInputOperator operator = dag.addOperator("TestOperator", TestGeneratorInputOperator.class); + dag.setAttribute(operator, attr, new TestAttributeValue()); + try { + dag.validate(); + Assert.fail("Setting not serializable attribute should throw exception"); + } catch (ValidationException e) { + assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator are not serializable", e.getMessage()); + } + + // Output Port attribute not serializable test + dag = new LogicalPlan(); + operator = dag.addOperator("TestOperator", TestGeneratorInputOperator.class); + dag.setOutputPortAttribute(operator.outport, attr, new TestAttributeValue()); + try { + dag.validate(); + Assert.fail("Setting not serializable attribute should throw exception"); + } catch (ValidationException e) { + assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator.outport are not serializable", e.getMessage()); + } + + // Input Port attribute not serializable test + dag = new LogicalPlan(); + GenericTestOperator operator1 = dag.addOperator("TestOperator", GenericTestOperator.class); + dag.setInputPortAttribute(operator1.inport1, attr, new TestAttributeValue()); + try { + dag.validate(); + Assert.fail("Setting non serializable attribute should throw exception"); + } catch (ValidationException e) { + assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator.inport1 are not serializable", e.getMessage()); + } + } + + private static class Object2String implements StringCodec<Object> + { + + @Override + public Object fromString(String string) + { + // Stub method for testing - do nothing + return null; + } + + @Override + public String toString(Object pojo) + { + // Stub method for testing - do nothing + return null; + } + + } + + private static class TestAttributeValue + { + } + + private static class TestStreamCodec implements StreamCodec<Object> + { @Override public Object fromByteArray(Slice fragment) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b799bd2d/engine/src/test/resources/testTopology.json ---------------------------------------------------------------------- diff --git a/engine/src/test/resources/testTopology.json b/engine/src/test/resources/testTopology.json index 62c5262..45e1f0e 100644 --- a/engine/src/test/resources/testTopology.json +++ b/engine/src/test/resources/testTopology.json @@ -25,7 +25,7 @@ "attributes": { "UNIFIER_LIMIT": 8, "STREAM_CODEC" : { - "com.datatorrent.common.codec.JsonStreamCodec" : {} + "com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest$TestStreamCodec" : {} } } }
