[
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15403976#comment-15403976
]
ASF GitHub Bot commented on APEXMALHAR-1701:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/335#discussion_r73156746
--- Diff:
library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java
---
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * Tests whether the operator functions correctly when partitioned
+ * The partitioning in Dedup is overridden by partitioning on basis of the
key in the tuple.
+ *
+ */
+public class DeduperPartitioningTest
+{
+ public static final int NUM_DEDUP_PARTITIONS = 5;
+ private static boolean testFailed = false;
+
+ /**
+ * Application to test the partitioning
+ *
+ */
+ public static class TestDedupApp implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TestGenerator gen = dag.addOperator("Generator", new
TestGenerator());
+
+ TestDeduper dedup = dag.addOperator("Deduper", new TestDeduper());
+ dedup.setKeyExpression("id");
+ dedup.setTimeExpression("eventTime.getTime()");
+ dedup.setBucketSpan(60);
+ dedup.setExpireBefore(600);
+
+ ConsoleOutputOperator console = dag.addOperator("Console", new
ConsoleOutputOperator());
+ dag.addStream("Generator to Dedup", gen.output, dedup.input);
+ dag.addStream("Dedup to Console", dedup.unique, console.input);
+ dag.setInputPortAttribute(dedup.input,
Context.PortContext.TUPLE_CLASS, TestEvent.class);
+ dag.setOutputPortAttribute(dedup.unique,
Context.PortContext.TUPLE_CLASS, TestEvent.class);
+ dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER,
+ new
StatelessPartitioner<TimeBasedDedupOperator>(NUM_DEDUP_PARTITIONS));
+ }
+ }
+
+ public static class TestDeduper extends TimeBasedDedupOperator
+ {
+ int operatorId;
+ boolean started = false;
+ HashMap<Integer, Integer> partitionMap = Maps.newHashMap();
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ operatorId = context.getId();
+ }
+
+ @Override
+ protected void processTuple(Object tuple)
+ {
+ TestEvent event = (TestEvent)tuple;
+ if (partitionMap.containsKey(event.id)) {
+ if (partitionMap.get(event.id) != operatorId) {
+ testFailed = true;
+ throw new RuntimeException("Wrong tuple assignment");
+ }
+ } else {
+ partitionMap.put(event.id, operatorId);
+ }
+ }
+ }
+
+ public static class TestGenerator extends BaseOperator implements
InputOperator
+ {
+
+ public final transient DefaultOutputPort<TestEvent> output = new
DefaultOutputPort<>();
+ private final transient Random r = new Random();
+
+ @Override
+ public void emitTuples()
+ {
+ TestEvent event = new TestEvent();
+ event.id = r.nextInt(100);
+ output.emit(event);
+ }
+ }
+
+ public static class TestEvent
+ {
+ private int id;
+ private Date eventTime;
+
+ public TestEvent()
+ {
+ }
+
+ public int getId()
+ {
+ return id;
+ }
+
+ public void setId(int id)
+ {
+ this.id = id;
+ }
+
+ public Date getEventTime()
+ {
+ return eventTime;
+ }
+
+ public void setEventTime(Date eventTime)
+ {
+ this.eventTime = eventTime;
+ }
+ }
+
+ /**
+ * This test validates whether a tuple key goes to exactly one partition
+ */
+ @Test
+ public void testDeduperStreamCodec()
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new TestDedupApp(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(10 * 1000); // runs for 10 seconds and quits
--- End diff --
done
> Deduper : create a deduper backed by Managed State
> --------------------------------------------------
>
> Key: APEXMALHAR-1701
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701
> Project: Apache Apex Malhar
> Issue Type: Task
> Components: algorithms
> Reporter: Chandni Singh
> Assignee: Chandni Singh
>
> Need a de-deduplicator operator that is based on Managed State.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)