Repository: apex-malhar Updated Branches: refs/heads/master 822323d02 -> 17f6c5523
APEXMALHAR-2185: Added Bounded Deduper Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/cc62a5eb Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/cc62a5eb Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/cc62a5eb Branch: refs/heads/master Commit: cc62a5eb7d6e58a01bf3e5a32edc889ceb43a75b Parents: 1700725 Author: bhupeshchawda <[email protected]> Authored: Thu Aug 11 15:40:07 2016 +0530 Committer: bhupeshchawda <[email protected]> Committed: Mon Aug 22 10:43:33 2016 +0530 ---------------------------------------------------------------------- .../apex/malhar/lib/dedup/AbstractDeduper.java | 25 ++- .../malhar/lib/dedup/BoundedDedupOperator.java | 206 +++++++++++++++++++ .../lib/dedup/TimeBasedDedupOperator.java | 31 ++- .../lib/dedup/DeduperBoundedPOJOImplTest.java | 110 ++++++++++ 4 files changed, 359 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cc62a5eb/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java index d06acc3..13a3475 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java @@ -30,9 +30,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl; import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.fs.Path; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -48,7 +50,6 @@ import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.ActivationListener; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.lib.fileaccess.FileAccessFSImpl; -import com.datatorrent.lib.fileaccess.TFileImpl; import com.datatorrent.netlet.util.Slice; /** @@ -68,6 +69,9 @@ import com.datatorrent.netlet.util.Slice; public abstract class AbstractDeduper<T> implements Operator, Operator.IdleTimeHandler, ActivationListener<Context>, Operator.CheckpointNotificationListener { + + private static final String BUCKET_DIR = "bucket_data"; + /** * The input port on which events are received. */ @@ -102,7 +106,7 @@ public abstract class AbstractDeduper<T> private boolean preserveTupleOrder = true; @NotNull - protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + protected AbstractManagedStateImpl managedState; /** * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous @@ -123,9 +127,8 @@ public abstract class AbstractDeduper<T> @Override public void setup(OperatorContext context) { - FileAccessFSImpl fAccessImpl = new TFileImpl.DTFileImpl(); - fAccessImpl.setBasePath(context.getValue(DAG.APPLICATION_PATH) + "/bucket_data"); - managedState.setFileAccess(fAccessImpl); + ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH) + + Path.SEPARATOR + BUCKET_DIR); managedState.setup(context); if (preserveTupleOrder) { @@ -155,9 +158,7 @@ public abstract class AbstractDeduper<T> */ protected void processTuple(T tuple) { - - long time = getTime(tuple); - Future<Slice> valFuture = managedState.getAsync(time, getKey(tuple)); + Future<Slice> valFuture = getAsyncManagedState(tuple); if (valFuture.isDone()) { try { @@ -211,7 +212,7 @@ public abstract class AbstractDeduper<T> { if (!preserveTupleOrder || waitingEvents.isEmpty()) { if (value == null) { - managedState.put(getTime(tuple), getKey(tuple), new Slice(new byte[0])); + putManagedState(tuple); processUnique(tuple); } else { processDuplicate(tuple); @@ -309,7 +310,7 @@ public abstract class AbstractDeduper<T> if (future.isDone() || finalize ) { try { if (future.get() == null && asyncEvents.get(tupleKey) == null) { - managedState.put(tupleTime, tupleKey, new Slice(new byte[0])); + putManagedState(tuple); asyncEvents.put(tupleKey, tupleTime); processUnique(tuple); } else { @@ -339,6 +340,10 @@ public abstract class AbstractDeduper<T> managedState.endWindow(); } + protected abstract Future<Slice> getAsyncManagedState(T tuple); + + protected abstract void putManagedState(T tuple); + /** * Records a decision for use later. This is needed to ensure that the order of incoming tuples is maintained. * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cc62a5eb/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java new file mode 100644 index 0000000..e71762e --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Arrays; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; + +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; +import com.datatorrent.netlet.util.Slice; + +/** + * An implementation for {@link AbstractDeduper} which handles the case of bounded data set. + * This implementation assumes that the incoming tuple does not have a time field, and the de-duplication + * is to be strictly based on the key of the tuple. + * + * This implementation uses {@link ManagedTimeStateImpl} for storing the tuple keys on the persistent storage. + * + * Following properties need to be configured for the functioning of the operator: + * 1. {@link #keyExpression}: The java expression to extract the key fields in the incoming tuple (POJO) + * 2. {@link #numBuckets} (optional): The number of buckets that need to be used for storing the keys of the + * incoming tuples. + * NOTE: Users can decide upon the proper value for this parameter by guessing the number of distinct keys + * in the application. A appropriate value would be sqrt(num distinct keys). In case, the number of distinct keys is a + * huge number, leave it blank so that the default value of 46340 will be used. The rationale for using this number is + * that sqrt(max integer) = 46340. This implies that the number of buckets used will roughly be equal to the size of + * each bucket, thus spreading the load equally among each bucket. + * + */ +@Evolving +public class BoundedDedupOperator extends AbstractDeduper<Object> +{ + private static final long DEFAULT_CONSTANT_TIME = 0; + private static final int DEFAULT_NUM_BUCKETS = 46340; + + // Required properties + @NotNull + private String keyExpression; + + //Optional, but recommended to be provided by user + private int numBuckets = DEFAULT_NUM_BUCKETS; + + private transient Class<?> pojoClass; + private transient Getter<Object, Object> keyGetter; + private transient StreamCodec<Object> streamCodec; + + @InputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() + { + @Override + public void setup(PortContext context) + { + pojoClass = context.getAttributes().get(PortContext.TUPLE_CLASS); + streamCodec = getDeduperStreamCodec(); + } + + @Override + public void process(Object tuple) + { + processTuple(tuple); + } + + @Override + public StreamCodec<Object> getStreamCodec() + { + return streamCodec; + } + }; + + public BoundedDedupOperator() + { + managedState = new ManagedTimeStateImpl(); + } + + @Override + public void setup(OperatorContext context) + { + if (numBuckets == 0) { + numBuckets = DEFAULT_NUM_BUCKETS; + } + ((ManagedTimeStateImpl)managedState).setNumBuckets(numBuckets); + TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner(); + managedState.setTimeBucketAssigner(timeBucketAssigner); + super.setup(context); + } + + @Override + public void activate(Context context) + { + keyGetter = PojoUtils.createGetter(pojoClass, keyExpression, Object.class); + } + + @Override + public void deactivate() + { + } + + @Override + protected long getTime(Object tuple) + { + return DEFAULT_CONSTANT_TIME; + } + + @Override + protected Slice getKey(Object tuple) + { + Object key = keyGetter.get(tuple); + return streamCodec.toByteArray(key); + } + + protected StreamCodec<Object> getDeduperStreamCodec() + { + return new DeduperStreamCodec(keyExpression); + } + + @Override + protected Future<Slice> getAsyncManagedState(Object tuple) + { + Slice key = getKey(tuple); + Future<Slice> valFuture = ((ManagedTimeStateImpl)managedState).getAsync(getBucketId(key), key); + return valFuture; + } + + @Override + protected void putManagedState(Object tuple) + { + Slice key = getKey(tuple); + ((ManagedTimeStateImpl)managedState).put(getBucketId(key), DEFAULT_CONSTANT_TIME, + key, new Slice(new byte[0])); + } + + protected int getBucketId(Slice key) + { + return Arrays.hashCode(key.buffer) % numBuckets; + } + + /** + * Returns the key expression + * @return key expression + */ + public String getKeyExpression() + { + return keyExpression; + } + + /** + * Sets the key expression for the fields used for de-duplication + * @param keyExpression the expression + */ + public void setKeyExpression(String keyExpression) + { + this.keyExpression = keyExpression; + } + + /** + * Returns the number of buckets + * @return number of buckets + */ + public int getNumBuckets() + { + return numBuckets; + } + + /** + * Sets the number of buckets + * NOTE: Users can decide upon the proper value for this parameter by guessing the number of distinct keys + * in the application. A appropriate value would be sqrt(num distinct keys). In case, the number of distinct keys is a + * huge number, leave it blank so that the default value of 46340 will be used. The rationale for using this number is + * that sqrt(max integer) = 46340. This implies that the number of buckets used will roughly be equal to the size of + * each bucket, thus spreading the load equally among each bucket. + * @param numBuckets the number of buckets + */ + public void setNumBuckets(int numBuckets) + { + this.numBuckets = numBuckets; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cc62a5eb/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java index 6aebe6b..225c8a3 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java @@ -18,11 +18,13 @@ */ package org.apache.apex.malhar.lib.dedup; +import java.util.concurrent.Future; + import javax.validation.constraints.NotNull; import org.joda.time.Duration; import org.joda.time.Instant; - +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; import org.apache.hadoop.classification.InterfaceStability.Evolving; @@ -95,6 +97,13 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements A private transient Getter<Object, Object> keyGetter; + private transient StreamCodec<Object> streamCodec; + + public TimeBasedDedupOperator() + { + managedState = new ManagedTimeUnifiedStateImpl(); + } + @InputPortFieldAnnotation(schemaRequired = true) public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() { @@ -102,6 +111,7 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements A public void setup(PortContext context) { pojoClass = context.getAttributes().get(PortContext.TUPLE_CLASS); + streamCodec = getDeduperStreamCodec(); } @Override @@ -113,7 +123,7 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements A @Override public StreamCodec<Object> getStreamCodec() { - return getDeduperStreamCodec(); + return streamCodec; } }; @@ -130,7 +140,7 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements A protected Slice getKey(Object tuple) { Object key = keyGetter.get(tuple); - return new Slice(key.toString().getBytes()); + return streamCodec.toByteArray(key); } protected StreamCodec<Object> getDeduperStreamCodec() @@ -165,6 +175,21 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements A { } + @Override + protected Future<Slice> getAsyncManagedState(Object tuple) + { + Future<Slice> valFuture = ((ManagedTimeUnifiedStateImpl)managedState).getAsync(getTime(tuple), + getKey(tuple)); + return valFuture; + } + + @Override + protected void putManagedState(Object tuple) + { + ((ManagedTimeUnifiedStateImpl)managedState).put(getTime(tuple), getKey(tuple), new Slice(new byte[0])); + } + + public String getKeyExpression() { return keyExpression; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cc62a5eb/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperBoundedPOJOImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperBoundedPOJOImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperBoundedPOJOImplTest.java new file mode 100644 index 0000000..448e76f --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperBoundedPOJOImplTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.io.IOException; +import java.util.Date; +import java.util.Random; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; +import com.datatorrent.stram.engine.PortContext; + +public class DeduperBoundedPOJOImplTest +{ + private static String applicationPath; + private static final String APPLICATION_PATH_PREFIX = "target/DeduperBoundedPOJOImplTest"; + private static final String APP_ID = "DeduperBoundedPOJOImplTest"; + private static final int OPERATOR_ID = 0; + private static BoundedDedupOperator deduper; + private static final int NUM_BUCKETS = 10; + + @Before + public void setup() + { + applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX); + deduper = new BoundedDedupOperator(); + deduper.setKeyExpression("key"); + deduper.setNumBuckets(NUM_BUCKETS); + } + + @Test + public void testDedup() + { + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes = + new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_ID, APP_ID); + attributes.put(DAG.APPLICATION_PATH, applicationPath); + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, TestPojo.class); + OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributes); + deduper.setup(context); + deduper.input.setup(new PortContext(attributes, context)); + deduper.activate(context); + CollectorTestSink<TestPojo> uniqueSink = new CollectorTestSink<TestPojo>(); + TestUtils.setSink(deduper.unique, uniqueSink); + CollectorTestSink<TestPojo> duplicateSink = new CollectorTestSink<TestPojo>(); + TestUtils.setSink(deduper.duplicate, duplicateSink); + CollectorTestSink<TestPojo> expiredSink = new CollectorTestSink<TestPojo>(); + TestUtils.setSink(deduper.expired, expiredSink); + + deduper.beginWindow(0); + Random r = new Random(); + int k = 1; + for (int i = 1; i <= 1000; i++) { + TestPojo pojo = new TestPojo(i, new Date(), k++); + deduper.input.process(pojo); + if (i % 10 == 0) { + int dupId = r.nextInt(i); + TestPojo pojoDuplicate = new TestPojo(dupId == 0 ? 1 : dupId, new Date(), k++); + deduper.input.process(pojoDuplicate); + } + } + deduper.handleIdleTime(); + deduper.endWindow(); + + Assert.assertTrue(uniqueSink.collectedTuples.size() == 1000); + Assert.assertTrue(duplicateSink.collectedTuples.size() == 100); + + deduper.teardown(); + } + + @After + public void teardown() + { + Path root = new Path(applicationPath); + try { + FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration()); + fs.delete(root, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +}
