This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch debugger in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit f02962d518f88e904a185b7fe69d09a37054a7ad Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Tue Apr 6 11:23:01 2021 -0400 [WAYANG-28] Add basic logic to HackitSniffer --- .../plugin/hackit/core/sniffer/HackitSniffer.java | 96 +++++++++++++++++++++- 1 file changed, 95 insertions(+), 1 deletion(-) diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/HackitSniffer.java b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/HackitSniffer.java index 9e6c77e..02342ff 100644 --- a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/HackitSniffer.java +++ b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/HackitSniffer.java @@ -17,5 +17,99 @@ */ package org.apache.wayang.plugin.hackit.core.sniffer; -public class HackitSniffer { +import org.apache.wayang.plugin.hackit.core.sniffer.actor.Actor; +import org.apache.wayang.plugin.hackit.core.sniffer.clone.Cloner; +import org.apache.wayang.plugin.hackit.core.sniffer.inject.Injector; +import org.apache.wayang.plugin.hackit.core.sniffer.shipper.Shipper; +import org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender.Sender; +import org.apache.wayang.plugin.hackit.core.sniffer.shipper.receiver.Receiver; +import org.apache.wayang.plugin.hackit.core.sniffer.sniff.Sniff; +import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.function.Function; + +public class HackitSniffer<K, T, SentType, SenderObj extends Sender<SentType>, ReceiverObj extends Receiver<HackitTuple<K,T>> > implements Function<HackitTuple<K, T>, Iterator<HackitTuple<K, T>>>, Serializable { + + private transient boolean not_first = false; + private Injector<HackitTuple<K, T>> hackItInjector; + + private Actor<HackitTuple<K, T>> actorFunction; + + private Shipper<HackitTuple<K, T>, SentType, SenderObj, ReceiverObj> shipper; + + private Sniff<HackitTuple<K, T>> hackItSniff; + private Cloner<HackitTuple<K, T>, SentType> hackItCloner; + + public HackitSniffer(Injector<HackitTuple<K, T>> hackItInjector, Actor<HackitTuple<K, T>> actorFunction, Shipper<HackitTuple<K, T>, SentType, SenderObj, ReceiverObj> shipper, Sniff<HackitTuple<K, T>> hackItSniff, Cloner<HackitTuple<K, T>, SentType> hackItCloner) { + this.hackItInjector = hackItInjector; + this.actorFunction = actorFunction; + this.shipper = shipper; + this.hackItSniff = hackItSniff; + this.hackItCloner = hackItCloner; + this.not_first = false; + } + + public HackitSniffer() { + //TODO this over configuration file + this.not_first = false; + } + + @Override + public Iterator<HackitTuple<K, T>> apply(HackitTuple<K, T> ktHackItTuple) { + if(!this.not_first){ + this.shipper.subscribeAsProducer(); + this.shipper.subscribeAsConsumer(); + this.not_first = true; + } + + if(this.hackItSniff.sniff(ktHackItTuple)){ + if(this.actorFunction.is_sendout(ktHackItTuple)){ + this.shipper.publish( + this.hackItCloner.clone(ktHackItTuple) + ); + } + } + Iterator<HackitTuple<K, T>> inyection = this.shipper.getNexts(); + + return this.hackItInjector.inject(ktHackItTuple, inyection); + } + + public HackitSniffer<K, T, SentType, SenderObj, ReceiverObj> setHackItInjector(Injector<HackitTuple<K, T>> hackItInjector) { + this.hackItInjector = hackItInjector; + return this; + } + + public HackitSniffer<K, T, SentType, SenderObj, ReceiverObj> setActorFunction(Actor<HackitTuple<K, T>> actorFunction) { + this.actorFunction = actorFunction; + return this; + } + + public HackitSniffer<K, T, SentType, SenderObj, ReceiverObj> setShipper(Shipper<HackitTuple<K, T>, SentType, SenderObj, ReceiverObj> shipper) { + this.shipper = shipper; + return this; + } + + public HackitSniffer<K, T, SentType, SenderObj, ReceiverObj> setHackItSniff(Sniff<HackitTuple<K, T>> hackItSniff) { + this.hackItSniff = hackItSniff; + return this; + } + + public HackitSniffer<K, T, SentType, SenderObj, ReceiverObj> setHackItCloner(Cloner<HackitTuple<K, T>, SentType> hackItCloner) { + this.hackItCloner = hackItCloner; + return this; + } + + @Override + public String toString() { + return "HackItSniffer{" + + "\nfirst=" + not_first + + ",\n hackItInjector=" + hackItInjector + + ",\n actorFunction=" + actorFunction + + ",\n shipper=" + shipper + + ",\n hackItSniff=" + hackItSniff + + ",\n hackItCloner=" + hackItCloner + + "\n}"; + } }
