Repository: incubator-apex-malhar Updated Branches: refs/heads/master 5a4b59ffa -> 9c11400a2
APEXMALHAR-2082 Data Filter Operator - Added Data Filter Operator - Marked it as Evolving - Added test cases around variations Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9c11400a Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9c11400a Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9c11400a Branch: refs/heads/master Commit: 9c11400a21fe78c1709933293dea9cad1490f887 Parents: 5a4b59f Author: Pradeep A. Dalvi <[email protected]> Authored: Wed May 11 21:36:02 2016 +0530 Committer: Pradeep A. Dalvi <[email protected]> Committed: Tue May 24 12:38:14 2016 -0700 ---------------------------------------------------------------------- .../datatorrent/lib/filter/FilterOperator.java | 213 +++++++++++++++++++ .../datatorrent/lib/filter/FilterAppTest.java | 138 ++++++++++++ .../com/datatorrent/lib/filter/FilterTest.java | 188 ++++++++++++++++ 3 files changed, 539 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9c11400a/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java b/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java new file mode 100644 index 0000000..8d61d00 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.lib.filter; + +import java.util.LinkedList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; + +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; + +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.expression.Expression; +import com.datatorrent.lib.util.PojoUtils; + +/** + * <b>FilterOperator</b> + * Filter Operator filter out tuples based on defined condition + * + * <b>Parameters</b> + * - condition: condition based on expression language + * + * <b>Input Port</b> takes POJOs as an input + * + * <b>Output Ports</b> + * - truePort emits POJOs meeting the given condition + * - falsePort emits POJOs not meeting the given condition + * - error port emits any error situation while evaluating expression + * + */ [email protected] +public class FilterOperator extends BaseOperator implements Operator.ActivationListener +{ + private String condition; + private List<String> expressionFunctions = new LinkedList<>(); + + private transient Class<?> inClazz = null; + private transient Expression<Boolean> expr = null; + + @AutoMetric + private long trueTuples; + + @AutoMetric + private long falseTuples; + + @AutoMetric + private long errorTuples; + + public final transient DefaultOutputPort<Object> truePort = new DefaultOutputPort<Object>(); + + public final transient DefaultOutputPort<Object> falsePort = new DefaultOutputPort<Object>(); + + public final transient DefaultOutputPort<Object> error = new DefaultOutputPort<Object>(); + + public FilterOperator() + { + expressionFunctions.add("java.lang.Math.*"); + expressionFunctions.add("org.apache.commons.lang3.StringUtils.*"); + expressionFunctions.add("org.apache.commons.lang3.StringEscapeUtils.*"); + expressionFunctions.add("org.apache.commons.lang3.time.DurationFormatUtils.*"); + expressionFunctions.add("org.apache.commons.lang3.time.DateFormatUtils.*"); + } + + @InputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() + { + public void setup(PortContext context) + { + inClazz = context.getValue(Context.PortContext.TUPLE_CLASS); + } + + @Override + public void process(Object t) + { + processTuple(t); + } + }; + + @Override + public void activate(Context context) + { + createExpression(); + } + + @Override + public void deactivate() + { + } + + @Override + public void beginWindow(long windowId) + { + errorTuples = trueTuples = falseTuples = 0; + } + + /** + * createExpression: create an expression from condition of POJO fields + * Override this function for custom field expressions + */ + protected void createExpression() + { + logger.info("Creating an expression for condition {}", condition); + expr = PojoUtils.createExpression(inClazz, condition, Boolean.class, + expressionFunctions.toArray(new String[expressionFunctions.size()])); + } + + /** + * evalExpression: Evaluate condition/expression + * Override this function for custom condition evaluation + */ + protected Boolean evalExpression(Object t) + { + return expr.execute(t); + } + + /** + * handleFilter: emit POJO meeting condition on truePort + * and if it did not meet condition then on falsePort + */ + private void processTuple(Object t) + { + try { + if (evalExpression(t)) { + truePort.emit(t); + trueTuples++; + } else { + falsePort.emit(t); + falseTuples++; + } + } catch (Exception ex) { + logger.error("Error in expression eval: {}", ex.getMessage()); + logger.debug("Exception: ", ex); + error.emit(t); + errorTuples++; + } + } + + /** + * Returns condition/expression with which Filtering is done + * + * @return condition parameter of Filter Operator + */ + public String getCondition() + { + return condition; + } + + /** + * Set condition/expression with which Filtering operation would be applied + * + * @param condition parameter of Filter Operator + */ + public void setCondition(String condition) + { + logger.info("Changing condition from {} to {}", this.condition, condition); + this.condition = condition; + } + + /** + * Returns the list of expression function which would be made available to + * expression to use. + * + * @return List of functions available in expression. + */ + public List<String> getExpressionFunctions() + { + return expressionFunctions; + } + + /** + * Set list of import classes/method should should be made statically available + * to expression to use. + * For ex. org.apache.apex.test1.Test would mean that "Test" method will be + * available in the expression to be used directly. + * This is an optional property. See constructor to see defaults that are included. + * + * @param expressionFunctions List of qualified class/method that needs to be + * imported to expression. + */ + public void setExpressionFunctions(List<String> expressionFunctions) + { + this.expressionFunctions = expressionFunctions; + } + + private static final Logger logger = LoggerFactory.getLogger(FilterOperator.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9c11400a/library/src/test/java/com/datatorrent/lib/filter/FilterAppTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/filter/FilterAppTest.java b/library/src/test/java/com/datatorrent/lib/filter/FilterAppTest.java new file mode 100644 index 0000000..3feb899 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/filter/FilterAppTest.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.filter; + +import java.util.Random; +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +/** + * Application Test for Filter Operator. + */ +public class FilterAppTest +{ + @Test + public void testFilterApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000); // runs for 10 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + public static class Application implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration configuration) + { + DummyInputGenerator input = dag.addOperator("Input", new DummyInputGenerator()); + FilterOperator filter = dag.addOperator("Filter", new FilterOperator()); + + filter.setCondition("(({$}.getNum() % 10) == 0)"); + + ConsoleOutputOperator trueConsole = dag.addOperator("TrueConsole", new ConsoleOutputOperator()); + trueConsole.setSilent(true); + ConsoleOutputOperator falseConsole = dag.addOperator("FalseConsole", new ConsoleOutputOperator()); + falseConsole.setSilent(true); + ConsoleOutputOperator errorConsole = dag.addOperator("ErrorConsole", new ConsoleOutputOperator()); + errorConsole.setSilent(true); + + dag.getMeta(filter).getMeta(filter.input).getAttributes().put(Context.PortContext.TUPLE_CLASS, DummyPOJO.class); + + dag.addStream("Connect", input.output, filter.input); + + dag.addStream("ConditionTrue", filter.truePort, trueConsole.input); + dag.addStream("ConditionFalse", filter.falsePort, falseConsole.input); + dag.addStream("ConditionError", filter.error, errorConsole.input); + } + } + + public static class DummyPOJO + { + private int num; + + public DummyPOJO() + { + //for kryo + } + + public DummyPOJO(int num) + { + this.num = num; + } + + public int getNum() + { + return num; + } + + public void setNum(int num) + { + this.num = num; + } + } + + public static class DummyInputGenerator implements InputOperator + { + public final transient DefaultOutputPort<DummyPOJO> output = new DefaultOutputPort<>(); + Random randomGenerator = new Random(); + + @Override + public void emitTuples() + { + output.emit(new DummyPOJO(randomGenerator.nextInt(1000))); + } + + @Override + public void beginWindow(long l) + { + } + + @Override + public void endWindow() + { + } + + @Override + public void setup(Context.OperatorContext context) + { + } + + @Override + public void teardown() + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9c11400a/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java b/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java new file mode 100644 index 0000000..ba32942 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.lib.filter; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.lib.testbench.CountTestSink; +import com.datatorrent.stram.engine.PortContext; + +/** + * Tests for FilterOperator + */ +public class FilterTest +{ + public static class DummyPrivatePOJO + { + private long val; + + public long getVal() + { + return val; + } + + public void setVal(long val) + { + this.val = val; + } + } + + public static class DummyPublicPOJO + { + public long val; + } + + + private static FilterOperator filter; + private static DummyPrivatePOJO data; + private static DummyPublicPOJO pdata; + + private static CountTestSink<Object> trueSink; + private static CountTestSink<Object> falseSink; + private static CountTestSink<Object> errorSink; + + public void clearSinks() + { + trueSink.clear(); + falseSink.clear(); + errorSink.clear(); + } + + public void prepareFilterOperator(Class<?> inClass, String condition) + { + filter.truePort.setSink(trueSink); + filter.falsePort.setSink(falseSink); + filter.error.setSink(errorSink); + + filter.setup(null); + + Attribute.AttributeMap in = new Attribute.AttributeMap.DefaultAttributeMap(); + in.put(Context.PortContext.TUPLE_CLASS, inClass); + filter.input.setup(new PortContext(in, null)); + + filter.setCondition(condition); + + filter.activate(null); + } + + public void clearFilterOperator() + { + clearSinks(); + + filter.deactivate(); + filter.teardown(); + } + + @Test + public void testFilterPrivate() + { + prepareFilterOperator(DummyPrivatePOJO.class, "({$}.getVal() == 100)"); + + filter.beginWindow(0); + + data.setVal(100); + filter.input.put(data); + Assert.assertEquals("true condition true tuples", 1, trueSink.getCount()); + Assert.assertEquals("true condition false tuples", 0, falseSink.getCount()); + Assert.assertEquals("true condition error tuples", 0, errorSink.getCount()); + + filter.endWindow(); + + clearSinks(); + + /* when condition is not true */ + filter.beginWindow(1); + + data.setVal(1000); + filter.input.put(data); + Assert.assertEquals("false condition true tuples", 0, trueSink.getCount()); + Assert.assertEquals("false condition false tuples", 1, falseSink.getCount()); + Assert.assertEquals("false condition error tuples", 0, errorSink.getCount()); + + filter.endWindow(); + + clearFilterOperator(); + } + + @Test + public void testFilterPublic() + { + prepareFilterOperator(DummyPublicPOJO.class, "({$}.val == 100)"); + + /* when condition is true */ + filter.beginWindow(0); + + pdata.val = 100; + filter.input.put(pdata); + Assert.assertEquals("true condition true tuples", 1, trueSink.getCount()); + Assert.assertEquals("true condition false tuples", 0, falseSink.getCount()); + Assert.assertEquals("true condition error tuples", 0, errorSink.getCount()); + + filter.endWindow(); + + clearSinks(); + + /* when condition is not true */ + filter.beginWindow(1); + + pdata.val = 1000; + filter.input.put(pdata); + Assert.assertEquals("false condition true tuples", 0, trueSink.getCount()); + Assert.assertEquals("false condition false tuples", 1, falseSink.getCount()); + Assert.assertEquals("false condition error tuples", 0, errorSink.getCount()); + + filter.endWindow(); + + clearFilterOperator(); + } + + @Test + public void testFilterError() + { + prepareFilterOperator(DummyPublicPOJO.class, "({$}.val == 1)"); + + filter.beginWindow(0); + + filter.input.put(data); + Assert.assertEquals("error condition true tuples", 0, trueSink.getCount()); + Assert.assertEquals("error condition false tuples", 0, falseSink.getCount()); + Assert.assertEquals("error condition error tuples", 1, errorSink.getCount()); + + filter.endWindow(); + + clearFilterOperator(); + } + + @BeforeClass + public static void setup() + { + data = new DummyPrivatePOJO(); + pdata = new DummyPublicPOJO(); + filter = new FilterOperator(); + + trueSink = new CountTestSink<>(); + falseSink = new CountTestSink<>(); + errorSink = new CountTestSink<>(); + } +}
