This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
The following commit(s) were added to refs/heads/master by this push: new 211f609 [ADS] added support for multiple subscriptions. 211f609 is described below commit 211f6092f95e64e2d20eb1e542ccdd4c145f832f Author: Sebastian Rühl <sru...@apache.org> AuthorDate: Fri Oct 19 09:40:55 2018 +0200 [ADS] added support for multiple subscriptions. --- .../java/ads/connection/AdsTcpPlcConnection.java | 160 ++++++++++----------- .../messages/DefaultPlcSubscriptionRequest.java | 15 +- .../messages/DefaultPlcSubscriptionResponse.java | 21 ++- .../messages/InternalPlcSubscriptionRequest.java | 3 + 4 files changed, 105 insertions(+), 94 deletions(-) diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java index 3a78dd8..3b20fd1 100644 --- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java +++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java @@ -33,7 +33,6 @@ import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol; import org.apache.plc4x.java.ads.protocol.Payload2TcpProtocol; import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol; import org.apache.plc4x.java.api.exceptions.PlcConnectionException; -import org.apache.plc4x.java.api.exceptions.PlcNotImplementedException; import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; import org.apache.plc4x.java.api.messages.*; import org.apache.plc4x.java.api.model.PlcConsumerRegistration; @@ -142,92 +141,87 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc @Override public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest plcSubscriptionRequest) { InternalPlcSubscriptionRequest internalPlcSubscriptionRequest = checkInternal(plcSubscriptionRequest, InternalPlcSubscriptionRequest.class); - // TODO: Make this multi-value CompletableFuture<PlcSubscriptionResponse> future = new CompletableFuture<>(); - if (internalPlcSubscriptionRequest.getNumberOfFields() != 1) { - throw new PlcNotImplementedException("Multirequest on subscribe not implemented yet"); - } - SubscriptionPlcField subscriptionPlcField = internalPlcSubscriptionRequest.getSubscriptionFields().get(0); - PlcField field = subscriptionPlcField.getPlcField(); - - IndexGroup indexGroup; - IndexOffset indexOffset; - AdsDataType adsDataType; - int numberOfElements; - // If this is a symbolic field, it has to be resolved first. - // TODO: This is blocking, should be changed to be async. - if (field instanceof SymbolicAdsField) { - mapFields((SymbolicAdsField) field); - DirectAdsField directAdsField = fieldMapping.get(field); - if (directAdsField == null) { - throw new PlcRuntimeException("Unresolvable field " + field); - } - indexGroup = IndexGroup.of(directAdsField.getIndexGroup()); - indexOffset = IndexOffset.of(directAdsField.getIndexOffset()); - adsDataType = directAdsField.getAdsDataType(); - numberOfElements = directAdsField.getNumberOfElements(); - } - // If it's no symbolic field, we can continue immediately - // without having to do any resolving. - else if (field instanceof DirectAdsField) { - DirectAdsField directAdsField = (DirectAdsField) field; - indexGroup = IndexGroup.of(directAdsField.getIndexGroup()); - indexOffset = IndexOffset.of(directAdsField.getIndexOffset()); - adsDataType = directAdsField.getAdsDataType(); - numberOfElements = directAdsField.getNumberOfElements(); - } else { - throw new IllegalArgumentException("Unsupported field type " + field.getClass()); - } + Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> responseItems = internalPlcSubscriptionRequest.getSubscriptionPlcFieldMap().entrySet().stream() + .map(subscriptionPlcFieldEntry -> { + String plcFieldName = subscriptionPlcFieldEntry.getKey(); + SubscriptionPlcField subscriptionPlcField = subscriptionPlcFieldEntry.getValue(); + PlcField field = subscriptionPlcField.getPlcField(); + + IndexGroup indexGroup; + IndexOffset indexOffset; + AdsDataType adsDataType; + int numberOfElements; + // If this is a symbolic field, it has to be resolved first. + // TODO: This is blocking, should be changed to be async. + if (field instanceof SymbolicAdsField) { + mapFields((SymbolicAdsField) field); + DirectAdsField directAdsField = fieldMapping.get(field); + if (directAdsField == null) { + throw new PlcRuntimeException("Unresolvable field " + field); + } + indexGroup = IndexGroup.of(directAdsField.getIndexGroup()); + indexOffset = IndexOffset.of(directAdsField.getIndexOffset()); + adsDataType = directAdsField.getAdsDataType(); + numberOfElements = directAdsField.getNumberOfElements(); + } + // If it's no symbolic field, we can continue immediately + // without having to do any resolving. + else if (field instanceof DirectAdsField) { + DirectAdsField directAdsField = (DirectAdsField) field; + indexGroup = IndexGroup.of(directAdsField.getIndexGroup()); + indexOffset = IndexOffset.of(directAdsField.getIndexOffset()); + adsDataType = directAdsField.getAdsDataType(); + numberOfElements = directAdsField.getNumberOfElements(); + } else { + throw new IllegalArgumentException("Unsupported field type " + field.getClass()); + } - final TransmissionMode transmissionMode; - long cycleTime = 4000000; - switch (subscriptionPlcField.getPlcSubscriptionType()) { - case CYCLIC: - transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE; - cycleTime = subscriptionPlcField.getDuration().orElseThrow(IllegalStateException::new).get(ChronoUnit.MILLIS); - break; - case CHANGE_OF_STATE: - transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERONCHA; - break; - default: - throw new PlcRuntimeException("Unmapped type " + subscriptionPlcField.getPlcSubscriptionType()); - } + final TransmissionMode transmissionMode; + long cycleTime = 4000000; + switch (subscriptionPlcField.getPlcSubscriptionType()) { + case CYCLIC: + transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE; + cycleTime = subscriptionPlcField.getDuration().orElseThrow(IllegalStateException::new).get(ChronoUnit.MILLIS); + break; + case CHANGE_OF_STATE: + transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERONCHA; + break; + default: + throw new PlcRuntimeException("Unmapped type " + subscriptionPlcField.getPlcSubscriptionType()); + } - // Prepare the subscription request itself. - AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = AdsAddDeviceNotificationRequest.of( - targetAmsNetId, - targetAmsPort, - sourceAmsNetId, - sourceAmsPort, - Invoke.NONE, - indexGroup, - indexOffset, - Length.of(adsDataType.getTargetByteSize() * (long) numberOfElements), - transmissionMode, - MaxDelay.of(0), - CycleTime.of(cycleTime) - ); - - // Send the request to the plc and wait for a response - // TODO: This is blocking, should be changed to be async. - CompletableFuture<InternalPlcProprietaryResponse<AdsAddDeviceNotificationResponse>> addDeviceFuture = new CompletableFuture<>(); - channel.writeAndFlush(new PlcRequestContainer<>(new DefaultPlcProprietaryRequest<>(adsAddDeviceNotificationRequest), addDeviceFuture)); - InternalPlcProprietaryResponse<AdsAddDeviceNotificationResponse> addDeviceResponse = getFromFuture(addDeviceFuture, ADD_DEVICE_TIMEOUT); - AdsAddDeviceNotificationResponse response = addDeviceResponse.getResponse(); - - // Abort if we got anything but a successful response. - if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) { - throw new PlcRuntimeException("Error code received " + response.getResult()); - } - AdsSubscriptionHandle adsSubscriptionHandle = new AdsSubscriptionHandle(this, response.getNotificationHandle()); - - Map<String, Pair<PlcResponseCode, PlcSubscriptionHandle>> responseItems = internalPlcSubscriptionRequest.getFieldNames() - .stream() - .collect(Collectors.toMap( - fieldName -> fieldName, - ignored -> Pair.of(PlcResponseCode.OK, adsSubscriptionHandle) - )); + // Prepare the subscription request itself. + AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = AdsAddDeviceNotificationRequest.of( + targetAmsNetId, + targetAmsPort, + sourceAmsNetId, + sourceAmsPort, + Invoke.NONE, + indexGroup, + indexOffset, + Length.of(adsDataType.getTargetByteSize() * (long) numberOfElements), + transmissionMode, + MaxDelay.of(0), + CycleTime.of(cycleTime) + ); + + // Send the request to the plc and wait for a response + // TODO: This is blocking, should be changed to be async. + CompletableFuture<InternalPlcProprietaryResponse<AdsAddDeviceNotificationResponse>> addDeviceFuture = new CompletableFuture<>(); + channel.writeAndFlush(new PlcRequestContainer<>(new DefaultPlcProprietaryRequest<>(adsAddDeviceNotificationRequest), addDeviceFuture)); + InternalPlcProprietaryResponse<AdsAddDeviceNotificationResponse> addDeviceResponse = getFromFuture(addDeviceFuture, ADD_DEVICE_TIMEOUT); + AdsAddDeviceNotificationResponse response = addDeviceResponse.getResponse(); + + // Abort if we got anything but a successful response. + if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) { + throw new PlcRuntimeException("Error code received " + response.getResult()); + } + PlcSubscriptionHandle adsSubscriptionHandle = new AdsSubscriptionHandle(this, response.getNotificationHandle()); + return Pair.of(plcFieldName, Pair.of(PlcResponseCode.OK, adsSubscriptionHandle)); + }) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); future.complete(new DefaultPlcSubscriptionResponse(internalPlcSubscriptionRequest, responseItems)); return future; diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java index fd0f595..599e4ad 100644 --- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java +++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionRequest.java @@ -77,6 +77,11 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq } @Override + public LinkedHashMap<String, SubscriptionPlcField> getSubscriptionPlcFieldMap() { + return fields; + } + + @Override public LinkedList<Pair<String, PlcField>> getNamedFields() { return fields.entrySet() .stream() @@ -88,7 +93,7 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq private final PlcSubscriber subscriber; private final PlcFieldHandler fieldHandler; - private final Map<String, BuilderItem<Object>> fields; + private final Map<String, BuilderItem> fields; public Builder(PlcSubscriber subscriber, PlcFieldHandler fieldHandler) { this.subscriber = subscriber; @@ -98,19 +103,19 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq @Override public PlcSubscriptionRequest.Builder addCyclicField(String name, String fieldQuery, Duration pollingInterval) { - fields.put(name, new BuilderItem<>(fieldQuery, PlcSubscriptionType.CYCLIC, pollingInterval)); + fields.put(name, new BuilderItem(fieldQuery, PlcSubscriptionType.CYCLIC, pollingInterval)); return this; } @Override public PlcSubscriptionRequest.Builder addChangeOfStateField(String name, String fieldQuery) { - fields.put(name, new BuilderItem<>(fieldQuery, PlcSubscriptionType.CHANGE_OF_STATE)); + fields.put(name, new BuilderItem(fieldQuery, PlcSubscriptionType.CHANGE_OF_STATE)); return this; } @Override public PlcSubscriptionRequest.Builder addEventField(String name, String fieldQuery) { - fields.put(name, new BuilderItem<>(fieldQuery, PlcSubscriptionType.EVENT)); + fields.put(name, new BuilderItem(fieldQuery, PlcSubscriptionType.EVENT)); return this; } @@ -125,7 +130,7 @@ public class DefaultPlcSubscriptionRequest implements InternalPlcSubscriptionReq return new DefaultPlcSubscriptionRequest(subscriber, parsedFields); } - private static class BuilderItem<T> { + private static class BuilderItem { private final String fieldQuery; private final PlcSubscriptionType plcSubscriptionType; private final Duration duration; diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java index a8924e9..47a5ce2 100644 --- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java +++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/DefaultPlcSubscriptionResponse.java @@ -20,6 +20,7 @@ package org.apache.plc4x.java.base.messages; import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; +import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest; import org.apache.plc4x.java.api.model.PlcField; import org.apache.plc4x.java.api.model.PlcSubscriptionHandle; @@ -42,8 +43,14 @@ public class DefaultPlcSubscriptionResponse implements InternalPlcSubscriptionRe @Override public PlcSubscriptionHandle getSubscriptionHandle(String name) { - // TODO: add safety - return values.get(name).getValue(); + Pair<PlcResponseCode, PlcSubscriptionHandle> response = values.get(name); + if (response == null) { + return null; + } + if (response.getKey() != PlcResponseCode.OK) { + throw new PlcRuntimeException("Item " + name + " failed to subscribe: " + response.getKey()); + } + return response.getValue(); } @Override @@ -53,14 +60,16 @@ public class DefaultPlcSubscriptionResponse implements InternalPlcSubscriptionRe @Override public PlcField getField(String name) { - // TODO: or should subscription handle be a successor of PlcField? - throw new NotImplementedException("field access not implemented"); + throw new NotImplementedException("field access not possible as these come async"); } @Override public PlcResponseCode getResponseCode(String name) { - // TODO: add safety - return values.get(name).getKey(); + Pair<PlcResponseCode, PlcSubscriptionHandle> response = values.get(name); + if (response == null) { + return null; + } + return response.getKey(); } @Override diff --git a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java index 608691a..6dcb30d 100644 --- a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java +++ b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/InternalPlcSubscriptionRequest.java @@ -21,9 +21,12 @@ package org.apache.plc4x.java.base.messages; import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest; import org.apache.plc4x.java.base.model.SubscriptionPlcField; +import java.util.LinkedHashMap; import java.util.LinkedList; public interface InternalPlcSubscriptionRequest extends PlcSubscriptionRequest, InternalPlcFieldRequest { LinkedList<SubscriptionPlcField> getSubscriptionFields(); + + LinkedHashMap<String, SubscriptionPlcField> getSubscriptionPlcFieldMap(); }