This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch debugger in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 0efab7baa550b3d8928f335fb45deb9211a342b1 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Tue Apr 6 11:21:27 2021 -0400 [WAYANG-28] Add Shipper components templates --- .../shipper/{Shipper.java => PSProtocol.java} | 7 +- .../hackit/core/sniffer/shipper/Shipper.java | 89 +++++++++++++++++++++- .../{Shipper.java => receiver/BufferReceiver.java} | 31 +++++++- .../{Shipper.java => receiver/Receiver.java} | 15 +++- .../shipper/{Shipper.java => sender/Sender.java} | 12 ++- 5 files changed, 146 insertions(+), 8 deletions(-) diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/PSProtocol.java similarity index 83% copy from wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java copy to wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/PSProtocol.java index 967f693..6a19794 100644 --- a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java +++ b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/PSProtocol.java @@ -17,5 +17,10 @@ */ package org.apache.wayang.plugin.hackit.core.sniffer.shipper; -public interface Shipper { +/** + * Publish and Subscribed Protocol + */ +public interface PSProtocol { + public PSProtocol addTopic(String... topic); + public PSProtocol addExchange(String exchange); } diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java index 967f693..969d8e8 100644 --- a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java +++ b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java @@ -17,5 +17,92 @@ */ package org.apache.wayang.plugin.hackit.core.sniffer.shipper; -public interface Shipper { +import org.apache.wayang.plugin.hackit.core.sniffer.shipper.receiver.Receiver; +import org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender.Sender; + +import java.io.Serializable; +import java.util.Iterator; + +public abstract class Shipper<T, ST, SenderObj extends Sender<ST>, ReceiverObj extends Receiver<T>> implements Iterator<T>, Serializable { + + protected Sender sender_instance; + protected Receiver receiver_instance; + + protected abstract Sender createSenderInstance(); + protected abstract Receiver createReceiverInstance(); + + /** Connect with the a Message queue service*/ + public void publish(ST value){ + if(this.sender_instance == null){ + throw new RuntimeException("The Sender of the Shipper is not instanciated"); + } + this.sender_instance.send(value); + } + + /** To subscribe as a producer */ + public void subscribeAsProducer(){ + this.sender_instance = this.createSenderInstance(); + this.sender_instance.init(); + } + + public void subscribeAsProducer(String... topic){ + this.subscribeAsProducer("default", topic); + } + + public void subscribeAsProducer(String metatopic, String... topic){ + this.subscribeAsProducer(); + ((PSProtocol)this.sender_instance) + .addExchange(metatopic) + .addTopic(topic) + ; + } + + /** Close connection */ + public void unsubscribeAsProducer(){ + if( this.sender_instance == null) return; + this.sender_instance.close(); + } + + /** To subscribe/unsubscribe as a consumer + * metatopic correspond to EXCHANGE_NAME + * topics correspond to bindingKeys + * */ + public void subscribeAsConsumer(){ + this.receiver_instance = this.createReceiverInstance(); + this.receiver_instance.init(); + } + public void subscribeAsConsumer(String... topic){ + this.subscribeAsProducer("default", topic); + } + public void subscribeAsConsumer(String metatopic, String... topic){ + this.subscribeAsConsumer(); + ((PSProtocol)this.receiver_instance) + .addExchange(metatopic) + .addTopic(topic) + ; + } + + /** Close connection */ + public void unsubscribeAsConsumer() { + if( this.receiver_instance == null) return; + this.receiver_instance.close(); + } + + public void close(){ + this.unsubscribeAsConsumer(); + this.unsubscribeAsProducer(); + } + + @Override + public abstract boolean hasNext(); + + @Override + public abstract T next(); + + public Iterator<T> getNexts(){ + if( this.receiver_instance == null){ + throw new RuntimeException("The Receiver of the Shipper is not instanciated"); + } + return this.receiver_instance.getElements(); + } } diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/receiver/BufferReceiver.java similarity index 51% copy from wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java copy to wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/receiver/BufferReceiver.java index 967f693..0a93dae 100644 --- a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java +++ b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/receiver/BufferReceiver.java @@ -15,7 +15,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.wayang.plugin.hackit.core.sniffer.shipper; +package org.apache.wayang.plugin.hackit.core.sniffer.shipper.receiver; -public interface Shipper { +import java.io.Serializable; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; + +public class BufferReceiver<T> implements Serializable { + //TODO implement the doble buffering + private transient Queue<T> queue; + + //TODO implement the server to receive the messages + public boolean start(){ + return true; + } + + //TODO registrer on the rest of the worker + public boolean register(){ + return true; + } + + public boolean existQueue(){ + return false; + } + + public void put(T value){ + if(this.queue == null){ + this.queue = new LinkedBlockingQueue<>(); + } + this.queue.add(value); + } } diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/receiver/Receiver.java similarity index 69% copy from wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java copy to wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/receiver/Receiver.java index 967f693..dd120c0 100644 --- a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java +++ b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/receiver/Receiver.java @@ -15,7 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.wayang.plugin.hackit.core.sniffer.shipper; +package org.apache.wayang.plugin.hackit.core.sniffer.shipper.receiver; -public interface Shipper { +import java.io.Serializable; +import java.util.Iterator; + +public abstract class Receiver<T> implements Serializable { + + private transient BufferReceiver<T> bufferReceive; + + public abstract void init(); + + public abstract Iterator<T> getElements(); + + public abstract void close(); } diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/sender/Sender.java similarity index 77% copy from wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java copy to wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/sender/Sender.java index 967f693..89eb57d 100644 --- a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/Shipper.java +++ b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/sniffer/shipper/sender/Sender.java @@ -15,7 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.wayang.plugin.hackit.core.sniffer.shipper; +package org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender; -public interface Shipper { +import java.io.Serializable; + +public interface Sender<T> extends Serializable { + + public void init(); + + public void send(T value); + + public void close(); }
