This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch debugger
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 0efab7baa550b3d8928f335fb45deb9211a342b1
Author: Bertty Contreras-Rojas <[email protected]>
AuthorDate: Tue Apr 6 11:21:27 2021 -0400

    [WAYANG-28] Add Shipper components templates
---
 .../shipper/{Shipper.java => PSProtocol.java}      |  7 +-
 .../hackit/core/sniffer/shipper/Shipper.java       | 89 +++++++++++++++++++++-
 .../{Shipper.java => receiver/BufferReceiver.java} | 31 +++++++-
 .../{Shipper.java => receiver/Receiver.java}       | 15 +++-
 .../shipper/{Shipper.java => sender/Sender.java}   | 12 ++-
 5 files changed, 146 insertions(+), 8 deletions(-)

diff --git 
a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
 
b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/PSProtocol.java
similarity index 83%
copy from 
wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
copy to 
wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/PSProtocol.java
index 967f693..6a19794 100644
--- 
a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
+++ 
b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/PSProtocol.java
@@ -17,5 +17,10 @@
  */
 package org.apache.wayang.plugin.hackit.core.sniffer.shipper;
 
-public interface Shipper {
+/**
+ * Publish and Subscribed Protocol
+ */
+public interface PSProtocol {
+    public PSProtocol addTopic(String... topic);
+    public PSProtocol addExchange(String exchange);
 }
diff --git 
a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
 
b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
index 967f693..969d8e8 100644
--- 
a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
+++ 
b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
@@ -17,5 +17,92 @@
  */
 package org.apache.wayang.plugin.hackit.core.sniffer.shipper;
 
-public interface Shipper {
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.receiver.Receiver;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender.Sender;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+public abstract class Shipper<T, ST, SenderObj extends Sender<ST>, ReceiverObj 
extends Receiver<T>> implements Iterator<T>, Serializable {
+
+    protected Sender sender_instance;
+    protected Receiver receiver_instance;
+
+    protected abstract Sender createSenderInstance();
+    protected abstract Receiver createReceiverInstance();
+
+    /** Connect with the a Message queue service*/
+    public void publish(ST value){
+        if(this.sender_instance == null){
+            throw new RuntimeException("The Sender of the Shipper is not 
instanciated");
+        }
+        this.sender_instance.send(value);
+    }
+
+    /** To subscribe as a producer */
+    public void subscribeAsProducer(){
+        this.sender_instance = this.createSenderInstance();
+        this.sender_instance.init();
+    }
+
+    public void subscribeAsProducer(String... topic){
+        this.subscribeAsProducer("default", topic);
+    }
+
+    public void subscribeAsProducer(String metatopic, String... topic){
+        this.subscribeAsProducer();
+        ((PSProtocol)this.sender_instance)
+                .addExchange(metatopic)
+                .addTopic(topic)
+        ;
+    }
+
+    /** Close connection */
+    public void unsubscribeAsProducer(){
+        if( this.sender_instance == null) return;
+        this.sender_instance.close();
+    }
+
+    /** To subscribe/unsubscribe as a consumer
+     * metatopic correspond to EXCHANGE_NAME
+     * topics correspond to bindingKeys
+     * */
+    public void subscribeAsConsumer(){
+        this.receiver_instance = this.createReceiverInstance();
+        this.receiver_instance.init();
+    }
+    public void subscribeAsConsumer(String... topic){
+        this.subscribeAsProducer("default", topic);
+    }
+    public void subscribeAsConsumer(String metatopic, String... topic){
+        this.subscribeAsConsumer();
+        ((PSProtocol)this.receiver_instance)
+                .addExchange(metatopic)
+                .addTopic(topic)
+        ;
+    }
+
+    /** Close connection */
+    public void unsubscribeAsConsumer() {
+        if( this.receiver_instance == null) return;
+        this.receiver_instance.close();
+    }
+
+    public void close(){
+        this.unsubscribeAsConsumer();
+        this.unsubscribeAsProducer();
+    }
+
+    @Override
+    public abstract boolean hasNext();
+
+    @Override
+    public abstract T next();
+
+    public Iterator<T> getNexts(){
+        if( this.receiver_instance == null){
+            throw new RuntimeException("The Receiver of the Shipper is not 
instanciated");
+        }
+        return this.receiver_instance.getElements();
+    }
 }
diff --git 
a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
 
b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/receiver/BufferReceiver.java
similarity index 51%
copy from 
wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
copy to 
wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/receiver/BufferReceiver.java
index 967f693..0a93dae 100644
--- 
a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
+++ 
b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/receiver/BufferReceiver.java
@@ -15,7 +15,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.wayang.plugin.hackit.core.sniffer.shipper;
+package org.apache.wayang.plugin.hackit.core.sniffer.shipper.receiver;
 
-public interface Shipper {
+import java.io.Serializable;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class BufferReceiver<T> implements Serializable {
+    //TODO implement the doble buffering
+    private transient Queue<T> queue;
+
+    //TODO implement the server to receive the messages
+    public boolean start(){
+        return true;
+    }
+
+    //TODO registrer on the rest of the worker
+    public boolean register(){
+        return true;
+    }
+
+    public boolean existQueue(){
+        return false;
+    }
+
+    public void put(T value){
+        if(this.queue == null){
+            this.queue = new LinkedBlockingQueue<>();
+        }
+        this.queue.add(value);
+    }
 }
diff --git 
a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
 
b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/receiver/Receiver.java
similarity index 69%
copy from 
wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
copy to 
wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/receiver/Receiver.java
index 967f693..dd120c0 100644
--- 
a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
+++ 
b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/receiver/Receiver.java
@@ -15,7 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.wayang.plugin.hackit.core.sniffer.shipper;
+package org.apache.wayang.plugin.hackit.core.sniffer.shipper.receiver;
 
-public interface Shipper {
+import java.io.Serializable;
+import java.util.Iterator;
+
+public abstract class Receiver<T> implements Serializable {
+
+    private transient BufferReceiver<T> bufferReceive;
+
+    public abstract void init();
+
+    public abstract Iterator<T> getElements();
+
+    public abstract void close();
 }
diff --git 
a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
 
b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/sender/Sender.java
similarity index 77%
copy from 
wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
copy to 
wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/sender/Sender.java
index 967f693..89eb57d 100644
--- 
a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java
+++ 
b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/sender/Sender.java
@@ -15,7 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.wayang.plugin.hackit.core.sniffer.shipper;
+package org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender;
 
-public interface Shipper {
+import java.io.Serializable;
+
+public interface Sender<T> extends Serializable {
+
+    public void init();
+
+    public void send(T value);
+
+    public void close();
 }

Reply via email to