This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git


The following commit(s) were added to refs/heads/master by this push:
     new 211f609  [ADS] added support for multiple subscriptions.
211f609 is described below

commit 211f6092f95e64e2d20eb1e542ccdd4c145f832f
Author: Sebastian Rühl <sru...@apache.org>
AuthorDate: Fri Oct 19 09:40:55 2018 +0200

    [ADS] added support for multiple subscriptions.
---
 .../java/ads/connection/AdsTcpPlcConnection.java   | 160 ++++++++++-----------
 .../messages/DefaultPlcSubscriptionRequest.java    |  15 +-
 .../messages/DefaultPlcSubscriptionResponse.java   |  21 ++-
 .../messages/InternalPlcSubscriptionRequest.java   |   3 +
 4 files changed, 105 insertions(+), 94 deletions(-)

diff --git 
a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
 
b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 3a78dd8..3b20fd1 100644
--- 
a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ 
b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -33,7 +33,6 @@ import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol;
 import org.apache.plc4x.java.ads.protocol.Payload2TcpProtocol;
 import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcNotImplementedException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.*;
 import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
@@ -142,92 +141,87 @@ public class AdsTcpPlcConnection extends 
AdsAbstractPlcConnection implements Plc
     @Override
     public CompletableFuture<PlcSubscriptionResponse> 
subscribe(PlcSubscriptionRequest plcSubscriptionRequest) {
         InternalPlcSubscriptionRequest internalPlcSubscriptionRequest = 
checkInternal(plcSubscriptionRequest, InternalPlcSubscriptionRequest.class);
-        // TODO: Make this multi-value
         CompletableFuture<PlcSubscriptionResponse> future = new 
CompletableFuture<>();
-        if (internalPlcSubscriptionRequest.getNumberOfFields() != 1) {
-            throw new PlcNotImplementedException("Multirequest on subscribe 
not implemented yet");
-        }
 
-        SubscriptionPlcField subscriptionPlcField = 
internalPlcSubscriptionRequest.getSubscriptionFields().get(0);
-        PlcField field = subscriptionPlcField.getPlcField();
-
-        IndexGroup indexGroup;
-        IndexOffset indexOffset;
-        AdsDataType adsDataType;
-        int numberOfElements;
-        // If this is a symbolic field, it has to be resolved first.
-        // TODO: This is blocking, should be changed to be async.
-        if (field instanceof SymbolicAdsField) {
-            mapFields((SymbolicAdsField) field);
-            DirectAdsField directAdsField = fieldMapping.get(field);
-            if (directAdsField == null) {
-                throw new PlcRuntimeException("Unresolvable field " + field);
-            }
-            indexGroup = IndexGroup.of(directAdsField.getIndexGroup());
-            indexOffset = IndexOffset.of(directAdsField.getIndexOffset());
-            adsDataType = directAdsField.getAdsDataType();
-            numberOfElements = directAdsField.getNumberOfElements();
-        }
-        // If it's no symbolic field, we can continue immediately
-        // without having to do any resolving.
-        else if (field instanceof DirectAdsField) {
-            DirectAdsField directAdsField = (DirectAdsField) field;
-            indexGroup = IndexGroup.of(directAdsField.getIndexGroup());
-            indexOffset = IndexOffset.of(directAdsField.getIndexOffset());
-            adsDataType = directAdsField.getAdsDataType();
-            numberOfElements = directAdsField.getNumberOfElements();
-        } else {
-            throw new IllegalArgumentException("Unsupported field type " + 
field.getClass());
-        }
+        Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> 
responseItems = 
internalPlcSubscriptionRequest.getSubscriptionPlcFieldMap().entrySet().stream()
+            .map(subscriptionPlcFieldEntry -> {
+                String plcFieldName = subscriptionPlcFieldEntry.getKey();
+                SubscriptionPlcField subscriptionPlcField = 
subscriptionPlcFieldEntry.getValue();
+                PlcField field = subscriptionPlcField.getPlcField();
+
+                IndexGroup indexGroup;
+                IndexOffset indexOffset;
+                AdsDataType adsDataType;
+                int numberOfElements;
+                // If this is a symbolic field, it has to be resolved first.
+                // TODO: This is blocking, should be changed to be async.
+                if (field instanceof SymbolicAdsField) {
+                    mapFields((SymbolicAdsField) field);
+                    DirectAdsField directAdsField = fieldMapping.get(field);
+                    if (directAdsField == null) {
+                        throw new PlcRuntimeException("Unresolvable field " + 
field);
+                    }
+                    indexGroup = IndexGroup.of(directAdsField.getIndexGroup());
+                    indexOffset = 
IndexOffset.of(directAdsField.getIndexOffset());
+                    adsDataType = directAdsField.getAdsDataType();
+                    numberOfElements = directAdsField.getNumberOfElements();
+                }
+                // If it's no symbolic field, we can continue immediately
+                // without having to do any resolving.
+                else if (field instanceof DirectAdsField) {
+                    DirectAdsField directAdsField = (DirectAdsField) field;
+                    indexGroup = IndexGroup.of(directAdsField.getIndexGroup());
+                    indexOffset = 
IndexOffset.of(directAdsField.getIndexOffset());
+                    adsDataType = directAdsField.getAdsDataType();
+                    numberOfElements = directAdsField.getNumberOfElements();
+                } else {
+                    throw new IllegalArgumentException("Unsupported field type 
" + field.getClass());
+                }
 
-        final TransmissionMode transmissionMode;
-        long cycleTime = 4000000;
-        switch (subscriptionPlcField.getPlcSubscriptionType()) {
-            case CYCLIC:
-                transmissionMode = 
TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE;
-                cycleTime = 
subscriptionPlcField.getDuration().orElseThrow(IllegalStateException::new).get(ChronoUnit.MILLIS);
-                break;
-            case CHANGE_OF_STATE:
-                transmissionMode = 
TransmissionMode.DefinedValues.ADSTRANS_SERVERONCHA;
-                break;
-            default:
-                throw new PlcRuntimeException("Unmapped type " + 
subscriptionPlcField.getPlcSubscriptionType());
-        }
+                final TransmissionMode transmissionMode;
+                long cycleTime = 4000000;
+                switch (subscriptionPlcField.getPlcSubscriptionType()) {
+                    case CYCLIC:
+                        transmissionMode = 
TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE;
+                        cycleTime = 
subscriptionPlcField.getDuration().orElseThrow(IllegalStateException::new).get(ChronoUnit.MILLIS);
+                        break;
+                    case CHANGE_OF_STATE:
+                        transmissionMode = 
TransmissionMode.DefinedValues.ADSTRANS_SERVERONCHA;
+                        break;
+                    default:
+                        throw new PlcRuntimeException("Unmapped type " + 
subscriptionPlcField.getPlcSubscriptionType());
+                }
 
-        // Prepare the subscription request itself.
-        AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = 
AdsAddDeviceNotificationRequest.of(
-            targetAmsNetId,
-            targetAmsPort,
-            sourceAmsNetId,
-            sourceAmsPort,
-            Invoke.NONE,
-            indexGroup,
-            indexOffset,
-            Length.of(adsDataType.getTargetByteSize() * (long) 
numberOfElements),
-            transmissionMode,
-            MaxDelay.of(0),
-            CycleTime.of(cycleTime)
-        );
-
-        // Send the request to the plc and wait for a response
-        // TODO: This is blocking, should be changed to be async.
-        
CompletableFuture<InternalPlcProprietaryResponse<AdsAddDeviceNotificationResponse>>
 addDeviceFuture = new CompletableFuture<>();
-        channel.writeAndFlush(new PlcRequestContainer<>(new 
DefaultPlcProprietaryRequest<>(adsAddDeviceNotificationRequest), 
addDeviceFuture));
-        InternalPlcProprietaryResponse<AdsAddDeviceNotificationResponse> 
addDeviceResponse = getFromFuture(addDeviceFuture, ADD_DEVICE_TIMEOUT);
-        AdsAddDeviceNotificationResponse response = 
addDeviceResponse.getResponse();
-
-        // Abort if we got anything but a successful response.
-        if (response.getResult().toAdsReturnCode() != 
AdsReturnCode.ADS_CODE_0) {
-            throw new PlcRuntimeException("Error code received " + 
response.getResult());
-        }
-        AdsSubscriptionHandle adsSubscriptionHandle = new 
AdsSubscriptionHandle(this, response.getNotificationHandle());
-
-        Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> 
responseItems = internalPlcSubscriptionRequest.getFieldNames()
-            .stream()
-            .collect(Collectors.toMap(
-                fieldName -> fieldName,
-                ignored -> Pair.of(PlcResponseCode.OK, adsSubscriptionHandle)
-            ));
+                // Prepare the subscription request itself.
+                AdsAddDeviceNotificationRequest 
adsAddDeviceNotificationRequest = AdsAddDeviceNotificationRequest.of(
+                    targetAmsNetId,
+                    targetAmsPort,
+                    sourceAmsNetId,
+                    sourceAmsPort,
+                    Invoke.NONE,
+                    indexGroup,
+                    indexOffset,
+                    Length.of(adsDataType.getTargetByteSize() * (long) 
numberOfElements),
+                    transmissionMode,
+                    MaxDelay.of(0),
+                    CycleTime.of(cycleTime)
+                );
+
+                // Send the request to the plc and wait for a response
+                // TODO: This is blocking, should be changed to be async.
+                
CompletableFuture<InternalPlcProprietaryResponse<AdsAddDeviceNotificationResponse>>
 addDeviceFuture = new CompletableFuture<>();
+                channel.writeAndFlush(new PlcRequestContainer<>(new 
DefaultPlcProprietaryRequest<>(adsAddDeviceNotificationRequest), 
addDeviceFuture));
+                
InternalPlcProprietaryResponse<AdsAddDeviceNotificationResponse> 
addDeviceResponse = getFromFuture(addDeviceFuture, ADD_DEVICE_TIMEOUT);
+                AdsAddDeviceNotificationResponse response = 
addDeviceResponse.getResponse();
+
+                // Abort if we got anything but a successful response.
+                if (response.getResult().toAdsReturnCode() != 
AdsReturnCode.ADS_CODE_0) {
+                    throw new PlcRuntimeException("Error code received " + 
response.getResult());
+                }
+                PlcSubscriptionHandle adsSubscriptionHandle = new 
AdsSubscriptionHandle(this, response.getNotificationHandle());
+                return Pair.of(plcFieldName, Pair.of(PlcResponseCode.OK, 
adsSubscriptionHandle));
+            })
+            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
 
         future.complete(new 
DefaultPlcSubscriptionResponse(internalPlcSubscriptionRequest, responseItems));
         return future;
diff --git 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
index fd0f595..599e4ad 100644
--- 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
+++ 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java
@@ -77,6 +77,11 @@ public class DefaultPlcSubscriptionRequest implements 
InternalPlcSubscriptionReq
     }
 
     @Override
+    public LinkedHashMap<String, SubscriptionPlcField> 
getSubscriptionPlcFieldMap() {
+        return fields;
+    }
+
+    @Override
     public LinkedList<Pair<String, PlcField>> getNamedFields() {
         return fields.entrySet()
             .stream()
@@ -88,7 +93,7 @@ public class DefaultPlcSubscriptionRequest implements 
InternalPlcSubscriptionReq
 
         private final PlcSubscriber subscriber;
         private final PlcFieldHandler fieldHandler;
-        private final Map<String, BuilderItem<Object>> fields;
+        private final Map<String, BuilderItem> fields;
 
         public Builder(PlcSubscriber subscriber, PlcFieldHandler fieldHandler) 
{
             this.subscriber = subscriber;
@@ -98,19 +103,19 @@ public class DefaultPlcSubscriptionRequest implements 
InternalPlcSubscriptionReq
 
         @Override
         public PlcSubscriptionRequest.Builder addCyclicField(String name, 
String fieldQuery, Duration pollingInterval) {
-            fields.put(name, new BuilderItem<>(fieldQuery, 
PlcSubscriptionType.CYCLIC, pollingInterval));
+            fields.put(name, new BuilderItem(fieldQuery, 
PlcSubscriptionType.CYCLIC, pollingInterval));
             return this;
         }
 
         @Override
         public PlcSubscriptionRequest.Builder addChangeOfStateField(String 
name, String fieldQuery) {
-            fields.put(name, new BuilderItem<>(fieldQuery, 
PlcSubscriptionType.CHANGE_OF_STATE));
+            fields.put(name, new BuilderItem(fieldQuery, 
PlcSubscriptionType.CHANGE_OF_STATE));
             return this;
         }
 
         @Override
         public PlcSubscriptionRequest.Builder addEventField(String name, 
String fieldQuery) {
-            fields.put(name, new BuilderItem<>(fieldQuery, 
PlcSubscriptionType.EVENT));
+            fields.put(name, new BuilderItem(fieldQuery, 
PlcSubscriptionType.EVENT));
             return this;
         }
 
@@ -125,7 +130,7 @@ public class DefaultPlcSubscriptionRequest implements 
InternalPlcSubscriptionReq
             return new DefaultPlcSubscriptionRequest(subscriber, parsedFields);
         }
 
-        private static class BuilderItem<T> {
+        private static class BuilderItem {
             private final String fieldQuery;
             private final PlcSubscriptionType plcSubscriptionType;
             private final Duration duration;
diff --git 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java
 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java
index a8924e9..47a5ce2 100644
--- 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java
+++ 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.java.base.messages;
 
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
 import org.apache.plc4x.java.api.model.PlcField;
 import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
@@ -42,8 +43,14 @@ public class DefaultPlcSubscriptionResponse implements 
InternalPlcSubscriptionRe
 
     @Override
     public PlcSubscriptionHandle getSubscriptionHandle(String name) {
-        // TODO: add safety
-        return values.get(name).getValue();
+        Pair<PlcResponseCode, PlcSubscriptionHandle> response = 
values.get(name);
+        if (response == null) {
+            return null;
+        }
+        if (response.getKey() != PlcResponseCode.OK) {
+            throw new PlcRuntimeException("Item " + name + " failed to 
subscribe: " + response.getKey());
+        }
+        return response.getValue();
     }
 
     @Override
@@ -53,14 +60,16 @@ public class DefaultPlcSubscriptionResponse implements 
InternalPlcSubscriptionRe
 
     @Override
     public PlcField getField(String name) {
-        // TODO: or should subscription handle be a successor of PlcField?
-        throw new NotImplementedException("field access not implemented");
+        throw new NotImplementedException("field access not possible as these 
come async");
     }
 
     @Override
     public PlcResponseCode getResponseCode(String name) {
-        // TODO: add safety
-        return values.get(name).getKey();
+        Pair<PlcResponseCode, PlcSubscriptionHandle> response = 
values.get(name);
+        if (response == null) {
+            return null;
+        }
+        return response.getKey();
     }
 
     @Override
diff --git 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
index 608691a..6dcb30d 100644
--- 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
+++ 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java
@@ -21,9 +21,12 @@ package org.apache.plc4x.java.base.messages;
 import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
 import org.apache.plc4x.java.base.model.SubscriptionPlcField;
 
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 
 public interface InternalPlcSubscriptionRequest extends 
PlcSubscriptionRequest, InternalPlcFieldRequest {
 
     LinkedList<SubscriptionPlcField> getSubscriptionFields();
+
+    LinkedHashMap<String, SubscriptionPlcField> getSubscriptionPlcFieldMap();
 }

Reply via email to