[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15403976#comment-15403976
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/335#discussion_r73156746
  
    --- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java
 ---
    @@ -0,0 +1,171 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.dedup;
    +
    +import java.util.Date;
    +import java.util.HashMap;
    +import java.util.Random;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.google.common.collect.Maps;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.InputOperator;
    +import com.datatorrent.api.LocalMode;
    +import com.datatorrent.api.StreamingApplication;
    +import com.datatorrent.common.partitioner.StatelessPartitioner;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.io.ConsoleOutputOperator;
    +
    +/**
    + * Tests whether the operator functions correctly when partitioned
    + * The partitioning in Dedup is overridden by partitioning on basis of the 
key in the tuple.
    + *
    + */
    +public class DeduperPartitioningTest
    +{
    +  public static final int NUM_DEDUP_PARTITIONS = 5;
    +  private static boolean testFailed = false;
    +
    +  /**
    +   * Application to test the partitioning
    +   *
    +   */
    +  public static class TestDedupApp implements StreamingApplication
    +  {
    +    @Override
    +    public void populateDAG(DAG dag, Configuration conf)
    +    {
    +      TestGenerator gen = dag.addOperator("Generator", new 
TestGenerator());
    +
    +      TestDeduper dedup = dag.addOperator("Deduper", new TestDeduper());
    +      dedup.setKeyExpression("id");
    +      dedup.setTimeExpression("eventTime.getTime()");
    +      dedup.setBucketSpan(60);
    +      dedup.setExpireBefore(600);
    +      
    +      ConsoleOutputOperator console = dag.addOperator("Console", new 
ConsoleOutputOperator());
    +      dag.addStream("Generator to Dedup", gen.output, dedup.input);
    +      dag.addStream("Dedup to Console", dedup.unique, console.input);
    +      dag.setInputPortAttribute(dedup.input, 
Context.PortContext.TUPLE_CLASS, TestEvent.class);
    +      dag.setOutputPortAttribute(dedup.unique, 
Context.PortContext.TUPLE_CLASS, TestEvent.class);
    +      dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, 
    +          new 
StatelessPartitioner<TimeBasedDedupOperator>(NUM_DEDUP_PARTITIONS));
    +    }
    +  }
    +
    +  public static class TestDeduper extends TimeBasedDedupOperator
    +  {
    +    int operatorId;
    +    boolean started = false;
    +    HashMap<Integer, Integer> partitionMap = Maps.newHashMap();
    +
    +    @Override
    +    public void setup(OperatorContext context)
    +    {
    +      super.setup(context);
    +      operatorId = context.getId();
    +    }
    +
    +    @Override
    +    protected void processTuple(Object tuple)
    +    {
    +      TestEvent event = (TestEvent)tuple;
    +      if (partitionMap.containsKey(event.id)) {
    +        if (partitionMap.get(event.id) != operatorId) {
    +          testFailed = true;
    +          throw new RuntimeException("Wrong tuple assignment");
    +        }
    +      } else {
    +        partitionMap.put(event.id, operatorId);
    +      }
    +    }
    +  }
    +
    +  public static class TestGenerator extends BaseOperator implements 
InputOperator
    +  {
    +
    +    public final transient DefaultOutputPort<TestEvent> output = new 
DefaultOutputPort<>();
    +    private final transient Random r = new Random();
    +
    +    @Override
    +    public void emitTuples()
    +    {
    +      TestEvent event = new TestEvent();
    +      event.id = r.nextInt(100);
    +      output.emit(event);
    +    }
    +  }
    +
    +  public static class TestEvent
    +  {
    +    private int id;
    +    private Date eventTime;
    +
    +    public TestEvent()
    +    {
    +    }
    +
    +    public int getId()
    +    {
    +      return id;
    +    }
    +
    +    public void setId(int id)
    +    {
    +      this.id = id;
    +    }
    +
    +    public Date getEventTime()
    +    {
    +      return eventTime;
    +    }
    +
    +    public void setEventTime(Date eventTime)
    +    {
    +      this.eventTime = eventTime;
    +    }
    +  }
    +
    +  /**
    +   * This test validates whether a tuple key goes to exactly one partition
    +   */
    +  @Test
    +  public void testDeduperStreamCodec()
    +  {
    +    try {
    +      LocalMode lma = LocalMode.newInstance();
    +      Configuration conf = new Configuration(false);
    +      lma.prepareDAG(new TestDedupApp(), conf);
    +      LocalMode.Controller lc = lma.getController();
    +      lc.run(10 * 1000); // runs for 10 seconds and quits
    --- End diff --
    
    done



> Deduper : create a deduper backed by Managed State
> --------------------------------------------------
>
>                 Key: APEXMALHAR-1701
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701
>             Project: Apache Apex Malhar
>          Issue Type: Task
>          Components: algorithms
>            Reporter: Chandni Singh
>            Assignee: Chandni Singh
>
> Need a de-deduplicator operator that is based on Managed State.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to