APEXMALHAR-1701: Added features to AbstractDeduper. Added Pojo implementation. Added unit tests.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7dea3d0a Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7dea3d0a Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7dea3d0a Branch: refs/heads/master Commit: 7dea3d0a06875cb30cd5dd9f5d6e353f909fc485 Parents: a1f6266 Author: bhupesh <[email protected]> Authored: Thu Jul 7 15:52:34 2016 +0530 Committer: bhupeshchawda <[email protected]> Committed: Tue Aug 2 19:59:10 2016 +0530 ---------------------------------------------------------------------- .../apex/malhar/lib/dedup/AbstractDeduper.java | 402 ++++++++++++++++--- .../malhar/lib/dedup/DeduperStreamCodec.java | 55 +++ .../lib/dedup/TimeBasedDedupOperator.java | 239 +++++++++++ .../malhar/lib/dedup/DeduperOrderingTest.java | 176 ++++++++ .../lib/dedup/DeduperPartitioningTest.java | 195 +++++++++ .../lib/dedup/DeduperTimeBasedPOJOImplTest.java | 116 ++++++ .../apache/apex/malhar/lib/dedup/TestPojo.java | 82 ++++ 7 files changed, 1203 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java index d23a28a..d06acc3 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java @@ -20,135 +20,368 @@ package org.apache.apex.malhar.lib.dedup; import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.apex.malhar.lib.state.BucketedState; import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; +import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; import com.datatorrent.netlet.util.Slice; /** - * An operator that de-dupes a stream. + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. * * @param <T> type of events */ +@Evolving @OperatorAnnotation(checkpointableWithinAppWindow = false) -public abstract class AbstractDeduper<T> implements Operator, Operator.CheckpointNotificationListener, - Operator.IdleTimeHandler +public abstract class AbstractDeduper<T> + implements Operator, Operator.IdleTimeHandler, ActivationListener<Context>, Operator.CheckpointNotificationListener { + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort<T> input = new DefaultInputPort<T>() + { + @Override + public final void process(T tuple) + { + processTuple(tuple); + } + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort<T> unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort<T> duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort<T> expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + @NotNull protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); - private transient long sleepMillis; - + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #preserveTupleOrder} is true. + */ + private transient Map<T, Decision> decisions; private transient Map<T, Future<Slice>> waitingEvents = Maps.newLinkedHashMap(); + private transient Map<Slice, Long> asyncEvents = Maps.newLinkedHashMap(); + + // Metrics + @AutoMetric + private transient long uniqueEvents; + @AutoMetric + private transient long duplicateEvents; + @AutoMetric + private transient long expiredEvents; - public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>(); + @Override + public void setup(OperatorContext context) + { + FileAccessFSImpl fAccessImpl = new TFileImpl.DTFileImpl(); + fAccessImpl.setBasePath(context.getValue(DAG.APPLICATION_PATH) + "/bucket_data"); + managedState.setFileAccess(fAccessImpl); + managedState.setup(context); - public final transient DefaultOutputPort<T> duplicates = new DefaultOutputPort<>(); + if (preserveTupleOrder) { + decisions = Maps.newLinkedHashMap(); + } + } - public final transient DefaultInputPort<T> input = new DefaultInputPort<T>() + @Override + public void beginWindow(long l) { - @Override - public void process(T tuple) - { - long time = getTime(tuple); - Future<Slice> valFuture = managedState.getAsync(time, getKey(tuple)); - if (valFuture.isDone()) { - try { - processEvent(tuple, valFuture.get()); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("process", e); - } + // Reset Metrics + uniqueEvents = 0; + duplicateEvents = 0; + expiredEvents = 0; + + managedState.beginWindow(l); + } + + protected abstract Slice getKey(T event); + + protected abstract long getTime(T event); + + /** + * Processes an incoming tuple + * + * @param tuple the incoming tuple + */ + protected void processTuple(T tuple) + { + + long time = getTime(tuple); + Future<Slice> valFuture = managedState.getAsync(time, getKey(tuple)); + + if (valFuture.isDone()) { + try { + processEvent(tuple, valFuture.get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } else { + processWaitingEvent(tuple, valFuture); + } + } + + /** + * Processes a looked-up event + * + * @param tuple the incoming tuple + * @param value the looked up key of the tuple + */ + protected void processEvent(T tuple, Slice value) + { + if (value == BucketedState.EXPIRED) { + processInvalid(tuple); + return; + } + processValid(tuple, value); + } + + /** + * Processes a tuple which is waiting for the lookup to return. + * + * @param tuple The tuple which needs to wait + * @param future The future object which will ultimately return the lookup result + */ + protected void processWaitingEvent(T tuple, Future<Slice> future) + { + waitingEvents.put(tuple, future); + if (preserveTupleOrder) { + recordDecision(tuple, Decision.UNKNOWN); + } + } + + /** + * Processes a valid (non-expired) tuple. This tuple may be a unique or a duplicate. + * + * @param tuple + * The tuple to be processed + * @param value + * Looked up key of the tuple + */ + protected void processValid(T tuple, Slice value) + { + if (!preserveTupleOrder || waitingEvents.isEmpty()) { + if (value == null) { + managedState.put(getTime(tuple), getKey(tuple), new Slice(new byte[0])); + processUnique(tuple); } else { - waitingEvents.put(tuple, valFuture); + processDuplicate(tuple); } + } else { + processWaitingEvent(tuple, Futures.immediateFuture(value)); } - }; + } - protected abstract long getTime(T tuple); + /** + * Processes invalid tuples. + * + * @param tuple the incoming tuple + */ + protected void processInvalid(T tuple) + { + if (preserveTupleOrder && !decisions.isEmpty()) { + recordDecision(tuple, Decision.EXPIRED); + } else { + processExpired(tuple); + } + } - protected abstract Slice getKey(T tuple); + /** + * Processes an expired tuple + * + * @param tuple the incoming tuple + */ + protected void processExpired(T tuple) + { + expiredEvents++; + emitExpired(tuple); + } - @Override - public void setup(Context.OperatorContext context) + /** + * Processes the duplicate tuple. + * + * @param tuple + * The tuple which is a duplicate + */ + protected void processDuplicate(T tuple) { - sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); - managedState.setup(context); + if (preserveTupleOrder && !decisions.isEmpty()) { + recordDecision(tuple, Decision.DUPLICATE); + } else { + duplicateEvents++; + emitDuplicate(tuple); + } } - @Override - public void beginWindow(long windowId) + /** + * Processes the unique tuple. + * + * @param tuple + * The tuple which is a unique + */ + protected void processUnique(T tuple) { - managedState.beginWindow(windowId); + if (preserveTupleOrder && !decisions.isEmpty()) { + recordDecision(tuple, Decision.UNIQUE); + } else { + uniqueEvents++; + emitUnique(tuple); + } } + /** + * {@inheritDoc} + */ @Override public void handleIdleTime() { + if (preserveTupleOrder) { + emitProcessedTuples(); + } + processAuxiliary(false); + } + + /** + * Does any auxiliary processing in the idle time of the operator. + * Processes any tuples which are waiting for the lookup to return. + * + * @param finalize Whether or not to wait for future to return + */ + protected void processAuxiliary(boolean finalize) + { if (waitingEvents.size() > 0) { Iterator<Map.Entry<T, Future<Slice>>> waitIterator = waitingEvents.entrySet().iterator(); while (waitIterator.hasNext()) { Map.Entry<T, Future<Slice>> waitingEvent = waitIterator.next(); - if (waitingEvent.getValue().isDone()) { + T tuple = waitingEvent.getKey(); + Slice tupleKey = getKey(tuple); + long tupleTime = getTime(tuple); + Future<Slice> future = waitingEvent.getValue(); + if (future.isDone() || finalize ) { try { - processEvent(waitingEvent.getKey(), waitingEvent.getValue().get()); + if (future.get() == null && asyncEvents.get(tupleKey) == null) { + managedState.put(tupleTime, tupleKey, new Slice(new byte[0])); + asyncEvents.put(tupleKey, tupleTime); + processUnique(tuple); + } else { + processDuplicate(tuple); + } } catch (InterruptedException | ExecutionException e) { throw new RuntimeException("handle idle time", e); } waitIterator.remove(); } - } - } else { - /* nothing to do here, so sleep for a while to avoid busy loop */ - try { - Thread.sleep(sleepMillis); - } catch (InterruptedException ie) { - throw new RuntimeException(ie); + if (!finalize) { + break; + } } } } - protected void processEvent(T tuple, Slice value) + @Override + public void endWindow() { - if (value == BucketedState.EXPIRED) { - return; - } - if (value == null) { - //not a duplicate event - output.emit(tuple); - } else { - if (duplicates.isConnected()) { - duplicates.emit(tuple); - } + processAuxiliary(true); + if (preserveTupleOrder) { + emitProcessedTuples(); } + Preconditions.checkArgument(waitingEvents.isEmpty()); + asyncEvents.clear(); + managedState.endWindow(); } - @Override - public void endWindow() + /** + * Records a decision for use later. This is needed to ensure that the order of incoming tuples is maintained. + * + * @param tuple the incoming tuple + * @param d The decision for the tuple + */ + protected void recordDecision(T tuple, Decision d) { - Iterator<Map.Entry<T, Future<Slice>>> waitIterator = waitingEvents.entrySet().iterator(); - while (waitIterator.hasNext()) { - Map.Entry<T, Future<Slice>> waitingEvent = waitIterator.next(); - try { - processEvent(waitingEvent.getKey(), waitingEvent.getValue().get()); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("end window", e); - } - waitIterator.remove(); + decisions.put(tuple, d); + } + /** + * Processes tuples for which the decision (unique / duplicate / expired) has been made. + * Breaks once an undecided tuple is found, as we don't want to emit out of order + */ + protected void emitProcessedTuples() + { + Iterator<Entry<T, Decision>> entries = decisions.entrySet().iterator(); + while (entries.hasNext()) { + Entry<T, Decision> td = entries.next(); + switch (td.getValue()) { + case UNIQUE: + uniqueEvents++; + emitUnique(td.getKey()); + entries.remove(); + break; + case DUPLICATE: + duplicateEvents++; + emitDuplicate(td.getKey()); + entries.remove(); + break; + case EXPIRED: + expiredEvents++; + emitExpired(td.getKey()); + entries.remove(); + break; + default: + /* + * Decision for this is still UNKNOWN. Tuple is still waiting for bucket to be loaded. Break. + */ + break; + } } - managedState.endWindow(); } @Override @@ -174,4 +407,49 @@ public abstract class AbstractDeduper<T> implements Operator, Operator.Checkpoin { managedState.committed(windowId); } + + protected void emitUnique(T event) + { + unique.emit(event); + } + + protected void emitDuplicate(T event) + { + duplicate.emit(event); + } + + protected void emitExpired(T event) + { + expired.emit(event); + } + + /** + * Checks whether output of deduper should preserve the input order + */ + public boolean isOrderedOutput() + { + return preserveTupleOrder; + } + + /** + * If set to true, the deduper will emit tuples in the order in which they were received. Tuples which arrived later + * will wait for previous tuples to get processed and emitted. If not set, the order of tuples may change as tuples + * may be emitted out of order as and when they get processed. + * + * @param preserveTupleOrder whether or not to preserve the order of incoming tuples + */ + public void setPreserveTupleOrder(boolean preserveTupleOrder) + { + this.preserveTupleOrder = preserveTupleOrder; + } + + /** + * Enum for holding all possible values for a decision for a tuple + */ + protected enum Decision + { + UNIQUE, DUPLICATE, EXPIRED, UNKNOWN + } + + private static final Logger logger = LoggerFactory.getLogger(AbstractDeduper.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java new file mode 100644 index 0000000..d40a550 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; + +/** + * A {@link StreamCodec} for {@link AbstractDeduper}. + * This helps in partitioning the tuples depending on the key field in the tuple. + * The {@link #getPartition(Object)} function returns the hash code of the key field + * + */ +@Evolving +public class DeduperStreamCodec extends KryoSerializableStreamCodec<Object> +{ + private static final long serialVersionUID = -6904078808859412149L; + + private transient Getter<Object, Object> getter; + private String keyExpression; + + public DeduperStreamCodec(String keyExpression) + { + this.keyExpression = keyExpression; + } + + @Override + public int getPartition(Object t) + { + if (getter == null) { + getter = PojoUtils.createGetter(t.getClass(), keyExpression, Object.class); + } + return getter.get(t).hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java new file mode 100644 index 0000000..6aebe6b --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import javax.validation.constraints.NotNull; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; +import com.datatorrent.netlet.util.Slice; + +/** + * Time based deduper will de-duplicate incoming POJO tuples and classify them into the following: + * 1. Unique + * 2. Duplicate + * 3. Expired + * + * Since this is de-duplicating in a stream of tuples, and we cannot store all incoming keys indefinitely, + * we use the concept of expiry, where incoming tuples expire after a specified period of time. In this case, + * we choose to expire an entire bucket of data as a unit. This requires the user to specify the bucketing + * structure in advance in order for the operator to function. Here are the parameters for specifying the + * bucketing structure: + * 1. {@link #expireBefore} (in seconds)- This is the total time period during which a tuple stays in the + * system and blocks any other tuple with the same key. + * 2. {@link #bucketSpan} (in seconds) - This is the unit which describes how large a bucket can be. + * Typically this should be defined depending on the use case. For example, if we have {@link #expireBefore} + * set to 1 hour, then typically we would be clubbing data in the order of minutes, so a {@link #bucketSpan} of + * around 1 minute or 5 minutes would make sense. Note that in this case, the entire data worth 1 minute or + * 5 minutes will expire as a whole. Setting it to 1 minute would make the number of time buckets in the system + * to be 1 hour / 1 minute = 60 buckets. Similarly setting {@link #bucketSpan} to 5 minutes would make number + * of buckets to be 12. Note that having too many or too less buckets could have a performance impact. If unsure, + * set the {@link #bucketSpan} to be ~ sqrt({@link #expireBefore}). This way the number of buckets and bucket span + * are balanced. + * 3. {@link #referenceInstant} - The reference point from which to start the time which is used for expiry. + * Setting the {@link #referenceInstant} to say, r seconds from the epoch, would initialize the start of expiry + * to be from that instant = r. The start and end of the expiry window periodically move by the span of a single + * bucket. Refer {@link TimeBucketAssigner} for details. + * + * Additionally, it also needs the following parameters: + * 1. {@link #keyExpression} - The java expression to extract the key fields in the incoming tuple (POJO) + * 2. {@link #timeExpression} - The java expression to extract the time field in the incoming tuple (POJO). + * In case there is no time field in the tuple, system time, when the tuple is processed, will be used. + * + */ +@Evolving +public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements ActivationListener<Context> +{ + + // Required properties + @NotNull + private String keyExpression; + + private String timeExpression; + + @NotNull + private long bucketSpan; + + @NotNull + private long expireBefore; + + // Optional + private long referenceInstant = new Instant().getMillis() / 1000; + + private transient Class<?> pojoClass; + + private transient Getter<Object, Long> timeGetter; + + private transient Getter<Object, Object> keyGetter; + + @InputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() + { + @Override + public void setup(PortContext context) + { + pojoClass = context.getAttributes().get(PortContext.TUPLE_CLASS); + } + + @Override + public void process(Object tuple) + { + processTuple(tuple); + } + + @Override + public StreamCodec<Object> getStreamCodec() + { + return getDeduperStreamCodec(); + } + }; + + @Override + protected long getTime(Object tuple) + { + if (timeGetter != null) { + return timeGetter.get(tuple); + } + return System.currentTimeMillis(); + } + + @Override + protected Slice getKey(Object tuple) + { + Object key = keyGetter.get(tuple); + return new Slice(key.toString().getBytes()); + } + + protected StreamCodec<Object> getDeduperStreamCodec() + { + return new DeduperStreamCodec(keyExpression); + } + + @Override + public void setup(OperatorContext context) + { + TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner(); + timeBucketAssigner.setBucketSpan(Duration.standardSeconds(bucketSpan)); + timeBucketAssigner.setExpireBefore(Duration.standardSeconds(expireBefore)); + timeBucketAssigner.setReferenceInstant(new Instant(referenceInstant * 1000)); + managedState.setTimeBucketAssigner(timeBucketAssigner); + super.setup(context); + } + + @Override + public void activate(Context context) + { + if (timeExpression != null) { + timeGetter = PojoUtils.createGetter(pojoClass, timeExpression, Long.class); + } else { + timeGetter = null; + } + keyGetter = PojoUtils.createGetter(pojoClass, keyExpression, Object.class); + } + + @Override + public void deactivate() + { + } + + public String getKeyExpression() + { + return keyExpression; + } + + /** + * Sets the key expression + * @param keyExpression + */ + public void setKeyExpression(String keyExpression) + { + this.keyExpression = keyExpression; + } + + public String getTimeExpression() + { + return timeExpression; + } + + /** + * Sets the time expression + * @param timeExpression + */ + public void setTimeExpression(String timeExpression) + { + this.timeExpression = timeExpression; + } + + public long getBucketSpan() + { + return bucketSpan; + } + + /** + * Sets the length of a single time bucket (in seconds) + * @param bucketSpan + */ + public void setBucketSpan(long bucketSpan) + { + this.bucketSpan = bucketSpan; + } + + public long getExpireBefore() + { + return expireBefore; + } + + /** + * Sets the expiry time (in seconds). Any event with time before this is considered to be expired. + * @param expireBefore + */ + public void setExpireBefore(long expireBefore) + { + this.expireBefore = expireBefore; + } + + public long getReferenceInstant() + { + return referenceInstant; + } + + /** + * Sets the reference instant (in seconds from the epoch). + * By default this is the time when the application is started. + * @param referenceInstant + */ + public void setReferenceInstant(long referenceInstant) + { + this.referenceInstant = referenceInstant; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java new file mode 100644 index 0000000..544c1df --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.io.IOException; +import java.util.Date; +import java.util.concurrent.CountDownLatch; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.Stats; +import com.datatorrent.api.StatsListener; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; + +public class DeduperOrderingTest +{ + public static boolean testFailed = false; + + @Test + public void testApplication() throws IOException, Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + DeduperOrderingTestApp app = new DeduperOrderingTestApp(); + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + app.verifier.latch.await(); + Assert.assertFalse(testFailed); + lc.shutdown(); + } + + public static class DeduperOrderingTestApp implements StreamingApplication + { + Verifier verifier; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + RandomDedupDataGenerator random = dag.addOperator("Input", RandomDedupDataGenerator.class); + + TimeBasedDedupOperator dedup = dag.addOperator("Dedup", TimeBasedDedupOperator.class); + dedup.setKeyExpression("key"); + dedup.setTimeExpression("date.getTime()"); + dedup.setBucketSpan(10); + dedup.setExpireBefore(60); + dedup.setPreserveTupleOrder(true); + FileAccessFSImpl fAccessImpl = new TFileImpl.DTFileImpl(); + fAccessImpl.setBasePath(dag.getAttributes().get(DAG.APPLICATION_PATH) + "/bucket_data"); + dedup.managedState.setFileAccess(fAccessImpl); + dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestPojo.class); + + verifier = dag.addOperator("Verifier", Verifier.class); + + dag.addStream("Input to Dedup", random.output, dedup.input); + dag.addStream("Dedup to Unique", dedup.unique, verifier.unique); + dag.addStream("Dedup to Duplicate", dedup.duplicate, verifier.duplicate); + dag.addStream("Dedup to Expired", dedup.expired, verifier.expired); + } + } + + public static class RandomDedupDataGenerator extends BaseOperator implements InputOperator + { + private final long count = 500; + private long windowCount = 0; + private long sequenceId = 0; + + public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>(); + + @Override + public void beginWindow(long windowId) + { + windowCount = 0; + } + + @Override + public void emitTuples() + { + if (windowCount < count) { + TestPojo pojo = new TestPojo(sequenceId, new Date(), sequenceId); + output.emit(pojo); + sequenceId++; + } + } + } + + public static class Verifier extends BaseOperator implements StatsListener + { + long prevSequence = 0; + + public transient CountDownLatch latch = new CountDownLatch(1); + @AutoMetric + int count = 0; + public final transient DefaultInputPort<Object> unique = new DefaultInputPort<Object>() + { + @Override + public void process(Object tuple) + { + TestPojo pojo = (TestPojo)tuple; + if (pojo.getSequence() < prevSequence) { + testFailed = true; + } + count++; + prevSequence = pojo.sequence; + } + }; + + public final transient DefaultInputPort<Object> duplicate = new DefaultInputPort<Object>() + { + @Override + public void process(Object tuple) + { + TestPojo pojo = (TestPojo)tuple; + if (pojo.getSequence() < prevSequence) { + testFailed = true; + } + count++; + prevSequence = pojo.sequence; + } + }; + + public final transient DefaultInputPort<Object> expired = new DefaultInputPort<Object>() + { + @Override + public void process(Object tuple) + { + TestPojo pojo = (TestPojo)tuple; + if (pojo.getSequence() < prevSequence) { + testFailed = true; + } + count++; + prevSequence = pojo.sequence; + } + }; + + @Override + public Response processStats(BatchedOperatorStats stats) + { + Stats.OperatorStats operatorStats = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1); + count = (Integer)operatorStats.metrics.get("count"); + if (count >= 1000) { + latch.countDown(); + } + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java new file mode 100644 index 0000000..ebe5a3e --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Date; +import java.util.HashMap; +import java.util.Random; +import java.util.concurrent.CountDownLatch; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Maps; +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.Stats; +import com.datatorrent.api.StatsListener; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +/** + * Tests whether the operator functions correctly when partitioned + * The partitioning in Dedup is overridden by partitioning on basis of the key in the tuple. + * + */ +public class DeduperPartitioningTest +{ + public static final int NUM_DEDUP_PARTITIONS = 5; + private static boolean testFailed = false; + + /** + * Application to test the partitioning + * + */ + public static class TestDedupApp implements StreamingApplication + { + TestDeduper dedup; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TestGenerator gen = dag.addOperator("Generator", new TestGenerator()); + + dedup = dag.addOperator("Deduper", new TestDeduper()); + dedup.setKeyExpression("id"); + dedup.setTimeExpression("eventTime.getTime()"); + dedup.setBucketSpan(60); + dedup.setExpireBefore(600); + + ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator()); + dag.addStream("Generator to Dedup", gen.output, dedup.input); + dag.addStream("Dedup to Console", dedup.unique, console.input); + dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class); + dag.setOutputPortAttribute(dedup.unique, Context.PortContext.TUPLE_CLASS, TestEvent.class); + dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, + new StatelessPartitioner<TimeBasedDedupOperator>(NUM_DEDUP_PARTITIONS)); + } + } + + public static class TestDeduper extends TimeBasedDedupOperator implements StatsListener + { + int operatorId; + HashMap<Integer, Integer> partitionMap = Maps.newHashMap(); + transient CountDownLatch latch = new CountDownLatch(1); + int tuplesProcessed = 0; + @AutoMetric + int tuplesProcessedCompletely = 0; + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + operatorId = context.getId(); + } + + @Override + protected void processTuple(Object tuple) + { + TestEvent event = (TestEvent)tuple; + if (partitionMap.containsKey(event.id)) { + if (partitionMap.get(event.id) != operatorId) { + testFailed = true; + throw new RuntimeException("Wrong tuple assignment"); + } + } else { + partitionMap.put(event.id, operatorId); + } + tuplesProcessed++; + } + + @Override + public void endWindow() + { + super.endWindow(); + tuplesProcessedCompletely = tuplesProcessed; + } + + @Override + public Response processStats(BatchedOperatorStats stats) + { + Stats.OperatorStats operatorStats = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1); + tuplesProcessedCompletely = (Integer)operatorStats.metrics.get("tuplesProcessedCompletely"); + if (tuplesProcessedCompletely >= 1000) { + latch.countDown(); + } + return null; + } + } + + public static class TestGenerator extends BaseOperator implements InputOperator + { + + public final transient DefaultOutputPort<TestEvent> output = new DefaultOutputPort<>(); + private final transient Random r = new Random(); + + @Override + public void emitTuples() + { + TestEvent event = new TestEvent(); + event.id = r.nextInt(100); + output.emit(event); + } + } + + public static class TestEvent + { + private int id; + private Date eventTime; + + public TestEvent() + { + } + + public int getId() + { + return id; + } + + public void setId(int id) + { + this.id = id; + } + + public Date getEventTime() + { + return eventTime; + } + + public void setEventTime(Date eventTime) + { + this.eventTime = eventTime; + } + } + + /** + * This test validates whether a tuple key goes to exactly one partition + */ + @Test + public void testDeduperStreamCodec() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + TestDedupApp app = new TestDedupApp(); + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + app.dedup.latch.await(); + Assert.assertFalse(testFailed); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java new file mode 100644 index 0000000..4b25341 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.io.IOException; +import java.util.Date; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; +import com.datatorrent.stram.engine.PortContext; + +public class DeduperTimeBasedPOJOImplTest +{ + private static String applicationPath; + private static final String APPLICATION_PATH_PREFIX = "target/DeduperPOJOImplTest"; + private static final String APP_ID = "DeduperPOJOImplTest"; + private static final int OPERATOR_ID = 0; + private static TimeBasedDedupOperator deduper; + + @Before + public void setup() + { + applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX); + deduper = new TimeBasedDedupOperator(); + deduper.setKeyExpression("key"); + deduper.setTimeExpression("date.getTime()"); + deduper.setBucketSpan(10); + deduper.setExpireBefore(60); + FileAccessFSImpl fAccessImpl = new TFileImpl.DTFileImpl(); + fAccessImpl.setBasePath(applicationPath + "/bucket_data"); + deduper.managedState.setFileAccess(fAccessImpl); + } + + @Test + public void testDedup() + { + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes = + new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_ID, APP_ID); + attributes.put(DAG.APPLICATION_PATH, applicationPath); + attributes.put(DAG.InputPortMeta.TUPLE_CLASS, TestPojo.class); + OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributes); + deduper.setup(context); + deduper.input.setup(new PortContext(attributes, context)); + deduper.activate(context); + CollectorTestSink<TestPojo> uniqueSink = new CollectorTestSink<TestPojo>(); + TestUtils.setSink(deduper.unique, uniqueSink); + CollectorTestSink<TestPojo> duplicateSink = new CollectorTestSink<TestPojo>(); + TestUtils.setSink(deduper.duplicate, duplicateSink); + CollectorTestSink<TestPojo> expiredSink = new CollectorTestSink<TestPojo>(); + TestUtils.setSink(deduper.expired, expiredSink); + + deduper.beginWindow(0); + + long millis = System.currentTimeMillis(); + for (int i = 0; i < 100; i++) { + TestPojo pojo = new TestPojo(i, new Date(millis + i)); + deduper.input.process(pojo); + } + TestPojo expiredPojo = new TestPojo(100, new Date(millis - 1000 * 60)); + deduper.input.process(expiredPojo); + for (int i = 90; i < 200; i++) { + TestPojo pojo = new TestPojo(i, new Date(millis + i)); + deduper.input.process(pojo); + } + deduper.handleIdleTime(); + deduper.endWindow(); + Assert.assertTrue(uniqueSink.collectedTuples.size() == 200); + Assert.assertTrue(duplicateSink.collectedTuples.size() == 10); + Assert.assertTrue(expiredSink.collectedTuples.size() == 1); + + deduper.teardown(); + } + + @After + public void teardown() + { + Path root = new Path(applicationPath); + try { + FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration()); + fs.delete(root, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/test/java/org/apache/apex/malhar/lib/dedup/TestPojo.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/TestPojo.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/TestPojo.java new file mode 100644 index 0000000..6311517 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/TestPojo.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Date; + +public class TestPojo +{ + private long key; + private Date date; + public long sequence; + + public TestPojo() + { + } + + public TestPojo(long key, Date date) + { + this.key = key; + this.date = date; + } + + public TestPojo(long key, Date date, long sequence) + { + this.key = key; + this.date = date; + this.sequence = sequence; + } + + public long getKey() + { + return key; + } + + public Date getDate() + { + return date; + } + + public void setKey(long key) + { + this.key = key; + } + + public void setDate(Date date) + { + this.date = date; + } + + public long getSequence() + { + return sequence; + } + + public void setSequence(long sequence) + { + this.sequence = sequence; + } + + @Override + public String toString() + { + return "TestPojo [key=" + key + ", date=" + date + ", sequence=" + sequence + "]"; + } + +}
