[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15297486#comment-15297486
 ] 

ASF GitHub Bot commented on APEXMALHAR-2082:
--------------------------------------------

Github user chinmaykolhatkar commented on a diff in the pull request:

    
https://github.com/apache/incubator-apex-malhar/pull/270#discussion_r64316099
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java ---
    @@ -0,0 +1,210 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package com.datatorrent.lib.filter;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Context.PortContext;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.api.annotation.InputPortFieldAnnotation;
    +
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.expression.Expression;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +/**
    + * <b>FilterOperator</b>
    + * Filter Operator filter out tuples based on defined condition
    + *
    + * <b>Parameters</b>
    + * - condition: condition based on expression langugage
    + *
    + * <b>Input Port</b> takes POJOs as an input
    + *
    + * <b>Output Ports</b>
    + * - truePort emits POJOs meeting the given condition
    + * - falsePort emits POJOs not meeting the given condition
    + * - error port emits any error situation while evaluating expression
    + * 
    + */
    [email protected]
    +public class FilterOperator extends BaseOperator implements 
Operator.ActivationListener
    +{
    +  private transient Class<?> inClazz = null;
    +  private transient Expression<Boolean> expr = null;
    +  private List<String> expressionFunctions = new LinkedList<>();
    +
    +  @AutoMetric
    +  private long trueTuples;
    +
    +  @AutoMetric
    +  private long falseTuples;
    +
    +  @AutoMetric
    +  private long errorTuples;
    +
    +  protected String condition;
    +
    +  public final transient DefaultOutputPort<Object> truePort = new 
DefaultOutputPort<Object>();
    +
    +  public final transient DefaultOutputPort<Object> falsePort = new 
DefaultOutputPort<Object>();
    +
    +  public final transient DefaultOutputPort<Object> error = new 
DefaultOutputPort<Object>();
    +
    +  public FilterOperator()
    +  {
    +    expressionFunctions.add("java.lang.Math.*");
    +    expressionFunctions.add("org.apache.commons.lang3.StringUtils.*");
    +    
expressionFunctions.add("org.apache.commons.lang3.StringEscapeUtils.*");
    +    
expressionFunctions.add("org.apache.commons.lang3.time.DurationFormatUtils.*");
    +    
expressionFunctions.add("org.apache.commons.lang3.time.DateFormatUtils.*");
    +  }
    +
    +  @InputPortFieldAnnotation(schemaRequired = true)
    +  public transient DefaultInputPort<Object> input = new 
DefaultInputPort<Object>()
    --- End diff --
    
    Please make this port final.


> Data Filter Operator 
> ---------------------
>
>                 Key: APEXMALHAR-2082
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2082
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: Pradeep A. Dalvi
>
> Data Filter Operator which will allow apex users to filter (select/drop) 
> tuples based on the certain condition from incoming stream.
> Use case:
> -------------
> In many cases, not all tuples are of interest for the downstream operators. 
> In such cases, one may want select/filter out tuples to downstream. Also one 
> may want to process tuples which did not meet the condition/expression.
> Functionality:
> -----------------
> 1. Any tuple for which expression could not be evaluated shall be emitted on 
> error output port.
> 2. Filter operator shall receive POJO as input tuple and emit POJO on either 
> of remaining output ports i.e. truePort and falsePort. As the output ports' 
> name signify, when condition is met then the POJO shall be emitted on 
> truePort and if condition is not met then that POJO shall be emitted on 
> falsePort.
> 3. Operator needs condition/expression as a input param. This condition is 
> based on expression language support we already have in Malhar.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to