Repository: apex-malhar Updated Branches: refs/heads/master 0ec1433b2 -> 7dea3d0a0
APEXMALHAR-1701 An abstract deduper implemenatation Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a1f62669 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a1f62669 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a1f62669 Branch: refs/heads/master Commit: a1f62669aaa46d2e38c42a8755baf4dc66d3246f Parents: 0ec1433 Author: Chandni Singh <[email protected]> Authored: Fri Apr 29 11:43:33 2016 -0700 Committer: bhupeshchawda <[email protected]> Committed: Tue Aug 2 17:29:22 2016 +0530 ---------------------------------------------------------------------- .../apex/malhar/lib/dedup/AbstractDeduper.java | 177 +++++++++++++++++++ 1 file changed, 177 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1f62669/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java new file mode 100644 index 0000000..d23a28a --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.netlet.util.Slice; + +/** + * An operator that de-dupes a stream. + * + * @param <T> type of events + */ +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper<T> implements Operator, Operator.CheckpointNotificationListener, + Operator.IdleTimeHandler +{ + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + private transient long sleepMillis; + + private transient Map<T, Future<Slice>> waitingEvents = Maps.newLinkedHashMap(); + + public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>(); + + public final transient DefaultOutputPort<T> duplicates = new DefaultOutputPort<>(); + + public final transient DefaultInputPort<T> input = new DefaultInputPort<T>() + { + @Override + public void process(T tuple) + { + long time = getTime(tuple); + Future<Slice> valFuture = managedState.getAsync(time, getKey(tuple)); + if (valFuture.isDone()) { + try { + processEvent(tuple, valFuture.get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("process", e); + } + } else { + waitingEvents.put(tuple, valFuture); + } + } + }; + + protected abstract long getTime(T tuple); + + protected abstract Slice getKey(T tuple); + + @Override + public void setup(Context.OperatorContext context) + { + sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); + managedState.setup(context); + } + + @Override + public void beginWindow(long windowId) + { + managedState.beginWindow(windowId); + } + + @Override + public void handleIdleTime() + { + if (waitingEvents.size() > 0) { + Iterator<Map.Entry<T, Future<Slice>>> waitIterator = waitingEvents.entrySet().iterator(); + while (waitIterator.hasNext()) { + Map.Entry<T, Future<Slice>> waitingEvent = waitIterator.next(); + if (waitingEvent.getValue().isDone()) { + try { + processEvent(waitingEvent.getKey(), waitingEvent.getValue().get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("handle idle time", e); + } + waitIterator.remove(); + } + } + } else { + /* nothing to do here, so sleep for a while to avoid busy loop */ + try { + Thread.sleep(sleepMillis); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + } + } + + protected void processEvent(T tuple, Slice value) + { + if (value == BucketedState.EXPIRED) { + return; + } + if (value == null) { + //not a duplicate event + output.emit(tuple); + } else { + if (duplicates.isConnected()) { + duplicates.emit(tuple); + } + } + } + + @Override + public void endWindow() + { + Iterator<Map.Entry<T, Future<Slice>>> waitIterator = waitingEvents.entrySet().iterator(); + while (waitIterator.hasNext()) { + Map.Entry<T, Future<Slice>> waitingEvent = waitIterator.next(); + try { + processEvent(waitingEvent.getKey(), waitingEvent.getValue().get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("end window", e); + } + waitIterator.remove(); + + } + managedState.endWindow(); + } + + @Override + public void teardown() + { + managedState.teardown(); + } + + @Override + public void beforeCheckpoint(long windowId) + { + managedState.beforeCheckpoint(windowId); + } + + @Override + public void checkpointed(long windowId) + { + managedState.checkpointed(windowId); + } + + @Override + public void committed(long windowId) + { + managedState.committed(windowId); + } +}
