This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
The following commit(s) were added to refs/heads/master by this push: new d04b40d Introduced build for PlcSubscriptionRequest and PlcUnsubscriptionRequest + generified items for subscription + reordered SubscriptionRequestCyclicItem so that consumer is the last parameter. + adjusted manual test d04b40d is described below commit d04b40dd50d401f06ccc7284b1b388044bfa7214 Author: Sebastian Rühl <sru...@apache.org> AuthorDate: Thu Aug 16 10:35:17 2018 +0200 Introduced build for PlcSubscriptionRequest and PlcUnsubscriptionRequest + generified items for subscription + reordered SubscriptionRequestCyclicItem so that consumer is the last parameter. + adjusted manual test --- .../java/org/apache/plc4x/camel/Plc4XConsumer.java | 9 +++- .../java/api/messages/PlcSubscriptionRequest.java | 51 ++++++++++++++++++- .../api/messages/PlcUnsubscriptionRequest.java | 57 ++++++++++++++++++++++ .../SubscriptionRequestChangeOfStateItem.java | 4 +- .../items/SubscriptionRequestCyclicItem.java | 6 +-- .../items/SubscriptionRequestEventItem.java | 6 +-- .../apache/plc4x/java/ads/ManualPlc4XAdsTest.java | 36 ++++++++------ 7 files changed, 143 insertions(+), 26 deletions(-) diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java index 16fc626..e04e016 100644 --- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java +++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java @@ -29,7 +29,10 @@ import org.apache.plc4x.java.api.exceptions.PlcException; import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest; import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse; import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest; -import org.apache.plc4x.java.api.messages.items.*; +import org.apache.plc4x.java.api.messages.items.SubscriptionEventItem; +import org.apache.plc4x.java.api.messages.items.SubscriptionRequestCyclicItem; +import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem; +import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem; import org.apache.plc4x.java.api.model.Address; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,7 +86,9 @@ public class Plc4XConsumer extends ServiceSupport implements Consumer, java.util @Override protected void doStart() throws InterruptedException, ExecutionException, TimeoutException { PlcSubscriptionRequest request = new PlcSubscriptionRequest(); - request.addItem(new SubscriptionRequestCyclicItem(dataType, address, this, TimeUnit.SECONDS, 3)); + @SuppressWarnings("unchecked") + SubscriptionRequestCyclicItem subscriptionRequestCyclicItem = new SubscriptionRequestCyclicItem(dataType, address, TimeUnit.SECONDS, 3, this); + request.addItem(subscriptionRequestCyclicItem); CompletableFuture<PlcSubscriptionResponse> subscriptionFuture = getSubscriber().subscribe(request); subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS); } diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java index 5fa8ac4..67c4894 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcSubscriptionRequest.java @@ -18,10 +18,59 @@ specific language governing permissions and limitations under the License. */ -import org.apache.plc4x.java.api.messages.items.SubscriptionRequestItem; +import org.apache.plc4x.java.api.messages.items.*; +import org.apache.plc4x.java.api.model.Address; + +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; public class PlcSubscriptionRequest extends PlcRequest<SubscriptionRequestItem<?>> { + public static PlcSubscriptionRequest.Builder builder() { + return new PlcSubscriptionRequest.Builder(); + } + + public static class Builder extends PlcRequest.Builder<SubscriptionRequestItem> { + + public final <T> Builder addChangeOfStateItem(Class<T> dataType, Address address, Consumer<SubscriptionEventItem<T>> consumer) { + // As we don't get a list as response rather we have individual consumers we don't need type checking here. + //checkType(dataType); + requests.add(new SubscriptionRequestChangeOfStateItem<>(dataType, address, consumer)); + return this; + } + + public final <T> Builder addCyclicItem(Class<T> dataType, Address address, Consumer<SubscriptionEventItem<T>> consumer, TimeUnit timeUnit, int period) { + // As we don't get a list as response rather we have individual consumers we don't need type checking here. + //checkType(dataType); + requests.add(new SubscriptionRequestCyclicItem<>(dataType, address, timeUnit, period, consumer)); + return this; + } + + public final <T> Builder addEventItem(Class<T> dataType, Address address, Consumer<SubscriptionEventItem<T>> consumer) { + // As we don't get a list as response rather we have individual consumers we don't need type checking here. + //checkType(dataType); + requests.add(new SubscriptionRequestEventItem<>(dataType, address, consumer)); + return this; + } + + public final Builder addItem(SubscriptionRequestItem subscriptionRequestItem) { + requests.add(subscriptionRequestItem); + return this; + } + + public final PlcSubscriptionRequest build() { + if (requests.isEmpty()) { + throw new IllegalStateException("No requests added"); + } + PlcSubscriptionRequest plcSubscriptionRequest = new PlcSubscriptionRequest(); + for (SubscriptionRequestItem request : requests) { + plcSubscriptionRequest.addItem(request); + } + return plcSubscriptionRequest; + } + + } + @Override public String toString() { return "PlcSubscriptionRequest{} " + super.toString(); diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java index ed12177..bc46db7 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcUnsubscriptionRequest.java @@ -18,12 +18,15 @@ under the License. */ package org.apache.plc4x.java.api.messages; +import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem; import org.apache.plc4x.java.api.messages.items.UnsubscriptionRequestItem; import org.apache.plc4x.java.api.model.SubscriptionHandle; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; public class PlcUnsubscriptionRequest implements PlcMessage { @@ -51,6 +54,60 @@ public class PlcUnsubscriptionRequest implements PlcMessage { return getRequestItems().size(); } + public static PlcUnsubscriptionRequest.Builder builder() { + return new PlcUnsubscriptionRequest.Builder(); + } + + public static class Builder extends PlcRequest.Builder<UnsubscriptionRequestItem> { + + public final Builder addHandle(SubscriptionHandle subscriptionHandle) { + requests.add(new UnsubscriptionRequestItem(subscriptionHandle)); + return this; + } + + public final Builder addHandle(SubscriptionHandle... subscriptionHandles) { + requests.addAll(Arrays.stream(subscriptionHandles).map(UnsubscriptionRequestItem::new).collect(Collectors.toList())); + return this; + } + + public final Builder addHandle(List<SubscriptionHandle> subscriptionHandles) { + requests.addAll(subscriptionHandles.stream().map(UnsubscriptionRequestItem::new).collect(Collectors.toList())); + return this; + } + + public final Builder addHandle(SubscriptionResponseItem subscriptionResponseItem) { + requests.add(new UnsubscriptionRequestItem(subscriptionResponseItem.getSubscriptionHandle())); + return this; + } + + public final Builder addItem(UnsubscriptionRequestItem unsubscriptionRequestItem) { + requests.add(unsubscriptionRequestItem); + return this; + } + + public final Builder addItem(UnsubscriptionRequestItem... unsubscriptionRequestItems) { + requests.addAll(Arrays.asList(unsubscriptionRequestItems)); + return this; + } + + public final Builder addItem(List<UnsubscriptionRequestItem> unsubscriptionRequestItems) { + requests.addAll(unsubscriptionRequestItems); + return this; + } + + public final PlcUnsubscriptionRequest build() { + if (requests.isEmpty()) { + throw new IllegalStateException("No requests added"); + } + PlcUnsubscriptionRequest plcUnsubscriptionRequest = new PlcUnsubscriptionRequest(); + for (UnsubscriptionRequestItem request : requests) { + plcUnsubscriptionRequest.addItem(request); + } + return plcUnsubscriptionRequest; + } + + } + @Override public String toString() { return "PlcUnsubscriptionRequest{" + diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java index 8e4b0eb..aa46b3a 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestChangeOfStateItem.java @@ -23,9 +23,9 @@ import org.apache.plc4x.java.api.model.SubscriptionType; import java.util.function.Consumer; -public class SubscriptionRequestChangeOfStateItem extends SubscriptionRequestItem { +public class SubscriptionRequestChangeOfStateItem<T> extends SubscriptionRequestItem<T> { - public SubscriptionRequestChangeOfStateItem(Class datatype, Address address, Consumer consumer) { + public SubscriptionRequestChangeOfStateItem(Class<T> datatype, Address address, Consumer<SubscriptionEventItem<T>> consumer) { super(datatype, address, SubscriptionType.CHANGE_OF_STATE, consumer); } diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java index 22793b2..06f3a77 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestCyclicItem.java @@ -25,13 +25,13 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -public class SubscriptionRequestCyclicItem extends SubscriptionRequestItem { +public class SubscriptionRequestCyclicItem<T> extends SubscriptionRequestItem<T> { private TimeUnit timeUnit; private int period; - public SubscriptionRequestCyclicItem(Class datatype, Address address, Consumer consumer, TimeUnit timeUnit, int period) { - super(datatype, address, SubscriptionType.CYCLIC, consumer); + public SubscriptionRequestCyclicItem(Class<T> dataType, Address address, TimeUnit timeUnit, int period, Consumer<SubscriptionEventItem<T>> consumer) { + super(dataType, address, SubscriptionType.CYCLIC, consumer); this.timeUnit = timeUnit; this.period = period; } diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java index e842d88..1abd1b4 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/items/SubscriptionRequestEventItem.java @@ -23,10 +23,10 @@ import org.apache.plc4x.java.api.model.SubscriptionType; import java.util.function.Consumer; -public class SubscriptionRequestEventItem extends SubscriptionRequestItem { +public class SubscriptionRequestEventItem<T> extends SubscriptionRequestItem<T> { - public SubscriptionRequestEventItem(Class datatype, Address address, Consumer consumer) { - super(datatype, address, SubscriptionType.EVENT, consumer); + public SubscriptionRequestEventItem(Class<T> dataType, Address address, Consumer<SubscriptionEventItem<T>> consumer) { + super(dataType, address, SubscriptionType.EVENT, consumer); } @Override diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java index 3511e32..a0e28c8 100644 --- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java +++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/ManualPlc4XAdsTest.java @@ -22,15 +22,17 @@ import org.apache.plc4x.java.PlcDriverManager; import org.apache.plc4x.java.api.connection.PlcConnection; import org.apache.plc4x.java.api.connection.PlcReader; import org.apache.plc4x.java.api.connection.PlcSubscriber; -import org.apache.plc4x.java.api.messages.*; -import org.apache.plc4x.java.api.messages.items.*; +import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest; +import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest; +import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse; +import org.apache.plc4x.java.api.messages.items.ReadResponseItem; +import org.apache.plc4x.java.api.messages.items.SubscriptionResponseItem; import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadRequest; import org.apache.plc4x.java.api.messages.specific.TypeSafePlcReadResponse; import org.apache.plc4x.java.api.model.Address; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; public class ManualPlc4XAdsTest { @@ -57,20 +59,24 @@ public class ManualPlc4XAdsTest { System.out.println("ResponseItem " + responseItem); responseItem.getValues().stream().map(integer -> "Value: " + integer).forEach(System.out::println); - Consumer<SubscriptionEventItem<Integer>> notificationConsumer = plcNotification -> System.out.println("Received notification " + plcNotification); PlcSubscriber plcSubscriber = plcConnection.getSubscriber().orElseThrow(() -> new RuntimeException("Subscribe not available")); - PlcSubscriptionRequest subscriptionRequest = new PlcSubscriptionRequest(); - subscriptionRequest.addItem(new SubscriptionRequestChangeOfStateItem(Integer.class, address, notificationConsumer)); - CompletableFuture<PlcSubscriptionResponse> subscriptionFuture = plcSubscriber.subscribe(subscriptionRequest); - PlcSubscriptionResponse subscriptionResponse = subscriptionFuture.get(5, TimeUnit.SECONDS); - SubscriptionResponseItem subscriptionResponseItem = subscriptionResponse.getResponseItem().get(); - PlcUnsubscriptionRequest unsubscriptionRequest = new PlcUnsubscriptionRequest(); - unsubscriptionRequest.addItem( - new UnsubscriptionRequestItem(subscriptionResponseItem.getSubscriptionHandle())); - CompletableFuture<PlcUnsubscriptionResponse> unsubscriptionFuture = - plcSubscriber.unsubscribe(unsubscriptionRequest); - PlcUnsubscriptionResponse unsubscriptionResponse = unsubscriptionFuture.get(5, TimeUnit.SECONDS); + PlcSubscriptionRequest subscriptionRequest = PlcSubscriptionRequest.builder() + .addChangeOfStateItem(Integer.class, address, plcNotification -> System.out.println("Received notification " + plcNotification)) + .build(); + + SubscriptionResponseItem subscriptionResponseItem = plcSubscriber.subscribe(subscriptionRequest) + .get(5, TimeUnit.SECONDS) + .getResponseItem().orElseThrow(() -> new RuntimeException("response not available")); + + TimeUnit.SECONDS.sleep(5); + + PlcUnsubscriptionRequest unsubscriptionRequest = PlcUnsubscriptionRequest.builder() + .addHandle(subscriptionResponseItem) + .build(); + + PlcUnsubscriptionResponse unsubscriptionResponse = plcSubscriber.unsubscribe(unsubscriptionRequest) + .get(5, TimeUnit.SECONDS); System.out.println(unsubscriptionResponse); } System.exit(0);