This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch debugger
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit f02962d518f88e904a185b7fe69d09a37054a7ad
Author: Bertty Contreras-Rojas <[email protected]>
AuthorDate: Tue Apr 6 11:23:01 2021 -0400

    [WAYANG-28] Add basic logic to HackitSniffer
---
 .../plugin/hackit/core/sniffer/HackitSniffer.java  | 96 +++++++++++++++++++++-
 1 file changed, 95 insertions(+), 1 deletion(-)

diff --git 
a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/HackitSniffer.java
 
b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/HackitSniffer.java
index 9e6c77e..02342ff 100644
--- 
a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/HackitSniffer.java
+++ 
b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/HackitSniffer.java
@@ -17,5 +17,99 @@
  */
 package org.apache.wayang.plugin.hackit.core.sniffer;
 
-public class HackitSniffer {
+import org.apache.wayang.plugin.hackit.core.sniffer.actor.Actor;
+import org.apache.wayang.plugin.hackit.core.sniffer.clone.Cloner;
+import org.apache.wayang.plugin.hackit.core.sniffer.inject.Injector;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.Shipper;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender.Sender;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.receiver.Receiver;
+import org.apache.wayang.plugin.hackit.core.sniffer.sniff.Sniff;
+import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.function.Function;
+
+public class HackitSniffer<K, T, SentType, SenderObj extends Sender<SentType>, 
ReceiverObj extends Receiver<HackitTuple<K,T>> > implements 
Function<HackitTuple<K, T>, Iterator<HackitTuple<K, T>>>, Serializable {
+
+    private transient boolean not_first = false;
+    private Injector<HackitTuple<K, T>> hackItInjector;
+
+    private Actor<HackitTuple<K, T>> actorFunction;
+
+    private Shipper<HackitTuple<K, T>, SentType, SenderObj, ReceiverObj> 
shipper;
+
+    private Sniff<HackitTuple<K, T>> hackItSniff;
+    private Cloner<HackitTuple<K, T>, SentType> hackItCloner;
+
+    public HackitSniffer(Injector<HackitTuple<K, T>> hackItInjector, 
Actor<HackitTuple<K, T>> actorFunction, Shipper<HackitTuple<K, T>, SentType, 
SenderObj, ReceiverObj> shipper, Sniff<HackitTuple<K, T>> hackItSniff, 
Cloner<HackitTuple<K, T>, SentType> hackItCloner) {
+        this.hackItInjector = hackItInjector;
+        this.actorFunction = actorFunction;
+        this.shipper = shipper;
+        this.hackItSniff = hackItSniff;
+        this.hackItCloner = hackItCloner;
+        this.not_first = false;
+    }
+
+    public HackitSniffer() {
+        //TODO this over configuration file
+        this.not_first = false;
+    }
+
+    @Override
+    public Iterator<HackitTuple<K, T>> apply(HackitTuple<K, T> ktHackItTuple) {
+        if(!this.not_first){
+            this.shipper.subscribeAsProducer();
+            this.shipper.subscribeAsConsumer();
+            this.not_first = true;
+        }
+
+        if(this.hackItSniff.sniff(ktHackItTuple)){
+            if(this.actorFunction.is_sendout(ktHackItTuple)){
+                this.shipper.publish(
+                        this.hackItCloner.clone(ktHackItTuple)
+                );
+            }
+        }
+        Iterator<HackitTuple<K, T>> inyection = this.shipper.getNexts();
+
+        return this.hackItInjector.inject(ktHackItTuple, inyection);
+    }
+
+    public HackitSniffer<K, T, SentType, SenderObj, ReceiverObj> 
setHackItInjector(Injector<HackitTuple<K, T>> hackItInjector) {
+        this.hackItInjector = hackItInjector;
+        return this;
+    }
+
+    public HackitSniffer<K, T, SentType, SenderObj, ReceiverObj> 
setActorFunction(Actor<HackitTuple<K, T>> actorFunction) {
+        this.actorFunction = actorFunction;
+        return this;
+    }
+
+    public HackitSniffer<K, T, SentType, SenderObj, ReceiverObj> 
setShipper(Shipper<HackitTuple<K, T>, SentType, SenderObj, ReceiverObj> 
shipper) {
+        this.shipper = shipper;
+        return this;
+    }
+
+    public HackitSniffer<K, T, SentType, SenderObj, ReceiverObj> 
setHackItSniff(Sniff<HackitTuple<K, T>> hackItSniff) {
+        this.hackItSniff = hackItSniff;
+        return this;
+    }
+
+    public HackitSniffer<K, T, SentType, SenderObj, ReceiverObj> 
setHackItCloner(Cloner<HackitTuple<K, T>, SentType> hackItCloner) {
+        this.hackItCloner = hackItCloner;
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return "HackItSniffer{" +
+                "\nfirst=" + not_first +
+                ",\n hackItInjector=" + hackItInjector +
+                ",\n actorFunction=" + actorFunction +
+                ",\n shipper=" + shipper +
+                ",\n hackItSniff=" + hackItSniff +
+                ",\n hackItCloner=" + hackItCloner +
+                "\n}";
+    }
 }

Reply via email to