Repository: apex-malhar Updated Branches: refs/heads/master 3e7b76b8a -> d2f0586c9
APEXMALHAR-2111 Projection Operator config changes - Change params to use List instead of comma-separated field names - Mark Operator as Evolving - Add projection operator exception in pom.xml for japicmp-maven-plugin Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ce21b59d Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ce21b59d Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ce21b59d Branch: refs/heads/master Commit: ce21b59d4d25639abf5a717fbbd4effcef8fe085 Parents: 42b9e22 Author: Pradeep A. Dalvi <[email protected]> Authored: Fri Jun 3 16:09:56 2016 -0700 Committer: Pradeep A. Dalvi <[email protected]> Committed: Fri Jun 10 14:21:52 2016 -0700 ---------------------------------------------------------------------- .../lib/projection/ProjectionOperator.java | 79 +++++++++++++++----- .../lib/projection/ActivateTest.java | 37 +++++---- .../lib/projection/ProjectionTest.java | 8 +- pom.xml | 1 + 4 files changed, 90 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ce21b59d/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java b/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java index 8e664a1..8c22140 100644 --- a/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java +++ b/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java @@ -20,15 +20,14 @@ package com.datatorrent.lib.projection; import java.lang.reflect.Field; - import java.util.ArrayList; -import java.util.Arrays; import java.util.List; +import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.commons.lang3.ClassUtils; +import org.apache.hadoop.classification.InterfaceStability; import com.google.common.annotations.VisibleForTesting; @@ -37,13 +36,10 @@ import com.datatorrent.api.Context; import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; - import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; - import com.datatorrent.common.util.BaseOperator; - import com.datatorrent.lib.util.PojoUtils; /** @@ -74,11 +70,14 @@ import com.datatorrent.lib.util.PojoUtils; * * @since 3.4.0 */ [email protected] public class ProjectionOperator extends BaseOperator implements Operator.ActivationListener<Context> { - protected String selectFields; - protected String dropFields; - protected String condition; + @NotNull + private List<String> selectFields; + + @NotNull + private List<String> dropFields; static class TypeInfo { @@ -162,7 +161,7 @@ public class ProjectionOperator extends BaseOperator implements Operator.Activat }; - public final transient DefaultOutputPort<Object> error = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> error = new DefaultOutputPort<>(); /** * addProjectedField: Add field details (name, type, getter and setter) for field with given name @@ -202,16 +201,14 @@ public class ProjectionOperator extends BaseOperator implements Operator.Activat public void activate(Context context) { final Field[] allFields = inClazz.getDeclaredFields(); - - if (selectFields != null && !selectFields.isEmpty()) { - List<String> sFields = Arrays.asList(selectFields.split(",")); - for (String s : sFields) { + if ((selectFields != null) && !selectFields.isEmpty()) { + for (String s : selectFields) { addProjectedField(s); } if (remainderClazz != null) { for (Field f : allFields) { - if (!sFields.contains(f.getName())) { + if (!selectFields.contains(f.getName())) { addRemainderField(f.getName()); } } @@ -219,11 +216,9 @@ public class ProjectionOperator extends BaseOperator implements Operator.Activat logger.info("Remainder Port does not have Schema class defined"); } } else { - List<String> dFields = new ArrayList<>(); - if (dropFields != null && !dropFields.isEmpty()) { - dFields = Arrays.asList(dropFields.split(",")); + if ((dropFields != null) && !dropFields.isEmpty()) { if (remainderClazz != null) { - for (String s : dFields) { + for (String s : dropFields) { addRemainderField(s); } } else { @@ -232,7 +227,7 @@ public class ProjectionOperator extends BaseOperator implements Operator.Activat } for (Field f : allFields) { - if (!dFields.contains(f.getName())) { + if ((dropFields == null) || !dropFields.contains(f.getName())) { addProjectedField(f.getName()); } } @@ -308,5 +303,49 @@ public class ProjectionOperator extends BaseOperator implements Operator.Activat } } + /** + * set selectFields, a list of fields to be selected from incoming POJO + * + * @param selectFields List of fields from POJO to be selected + * @description $[] Field which become part of selected fields + * @useSchema $[] input.fields[].name + */ + public void setSelectFields(List<String> selectFields) + { + this.selectFields = selectFields; + } + + /** + * get selectFields, a list of fields to be selected from incoming POJO + * + * @return selectFields list of fields from POJO to be selected + */ + public List<String> getSelectFields() + { + return selectFields; + } + + /** + * set dropFields, a list of fields to be dropped from incoming POJO + * + * @param dropFields List of fields from POJO to be selected + * @description $[] Field which become part of dropped fields + * @useSchema $[] input.fields[].name + */ + public void setDropFields(List<String> dropFields) + { + this.dropFields = dropFields; + } + + /** + * get dropFields, a list of fields to be dropped from incoming POJO + * + * @return dropFields list of fields from POJO to be selected + */ + public List<String> getDropFields() + { + return dropFields; + } + private static final Logger logger = LoggerFactory.getLogger(ProjectionOperator.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ce21b59d/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java b/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java index f0b684f..cab0330 100644 --- a/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java +++ b/library/src/test/java/com/datatorrent/lib/projection/ActivateTest.java @@ -19,7 +19,9 @@ package com.datatorrent.lib.projection; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import org.junit.AfterClass; import org.junit.Assert; @@ -74,14 +76,13 @@ public class ActivateTest } private static ProjectionOperator projection; - private static DummyPOJO data; @Test public void testSelectDropFieldsNull() { logger.debug("start round 0"); - projection.selectFields = null; - projection.dropFields = null; + projection.setSelectFields(null); + projection.setDropFields(null); projection.activate(null); Assert.assertEquals("projected fields", 3, projection.getProjectedFields().size()); Assert.assertEquals("remainder fields", 0, projection.getRemainderFields().size()); @@ -93,8 +94,8 @@ public class ActivateTest public void testSelectDropFieldsEmpty() { logger.debug("start round 0"); - projection.selectFields = ""; - projection.dropFields = ""; + projection.setSelectFields(new ArrayList<String>()); + projection.setDropFields(new ArrayList<String>()); projection.activate(null); Assert.assertEquals("projected fields", 3, projection.getProjectedFields().size()); Assert.assertEquals("remainder fields", 0, projection.getRemainderFields().size()); @@ -106,8 +107,11 @@ public class ActivateTest public void testSelectFields() { logger.debug("start round 0"); - projection.selectFields = "l,str"; - projection.dropFields = ""; + List<String> sFields = new ArrayList<>(); + sFields.add("l"); + sFields.add("str"); + projection.setSelectFields(sFields); + projection.setDropFields(new ArrayList<String>()); projection.activate(null); Assert.assertEquals("projected fields", 2, projection.getProjectedFields().size()); Assert.assertEquals("remainder fields", 1, projection.getRemainderFields().size()); @@ -119,8 +123,11 @@ public class ActivateTest public void testDropFields() { logger.debug("start round 0"); - projection.selectFields = ""; - projection.dropFields = "str,date"; + List<String> dFields = new ArrayList<>(); + dFields.add("str"); + dFields.add("date"); + projection.setDropFields(new ArrayList<String>()); + projection.setDropFields(dFields); projection.activate(null); Assert.assertEquals("projected fields", 1, projection.getProjectedFields().size()); Assert.assertEquals("remainder fields", 2, projection.getRemainderFields().size()); @@ -132,9 +139,14 @@ public class ActivateTest public void testBothFieldsSpecified() { logger.debug("start round 0"); - projection.selectFields = ""; - projection.selectFields = "l,str"; - projection.dropFields = "str,date"; + List<String> sFields = new ArrayList<>(); + sFields.add("l"); + sFields.add("str"); + List<String> dFields = new ArrayList<>(); + dFields.add("str"); + dFields.add("date"); + projection.setSelectFields(sFields); + projection.setDropFields(dFields); projection.activate(null); Assert.assertEquals("projected fields", 2, projection.getProjectedFields().size()); Assert.assertEquals("remainder fields", 1, projection.getRemainderFields().size()); @@ -145,7 +157,6 @@ public class ActivateTest @BeforeClass public static void setup() { - data = new DummyPOJO(); projection = new ProjectionOperator(); projection.inClazz = DummyPOJO.class; projection.projectedClazz = DummyPOJO.class; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ce21b59d/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java b/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java index a47b167..e873768 100644 --- a/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java +++ b/library/src/test/java/com/datatorrent/lib/projection/ProjectionTest.java @@ -20,6 +20,8 @@ package com.datatorrent.lib.projection; import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; import org.junit.AfterClass; import org.junit.Assert; @@ -99,7 +101,7 @@ public class ProjectionTest public Long getFieldValue(Object p, String field) { - Long value = new Long(0); + Long value = 0L; for (Field f: p.getClass().getDeclaredFields()) { f.setAccessible(true); @@ -262,8 +264,10 @@ public class ProjectionTest projection.projectedClazz = ProjectedPOJO.class; projection.remainderClazz = RemainderPOJO.class; + List<String> sFields = new ArrayList<>(); + sFields.add("projected"); - projection.selectFields = "projected"; + projection.setSelectFields(sFields); projection.activate(null); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ce21b59d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3f1b6ee..4274379 100644 --- a/pom.xml +++ b/pom.xml @@ -135,6 +135,7 @@ <exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude> <exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude> <exclude>org.apache.apex.malhar.lib.fs.BytesFileOutputOperator</exclude> + <exclude>com.datatorrent.lib.projection</exclude> </excludes> </parameter> <skip>${semver.plugin.skip}</skip>
