This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch feature/notificationSupport in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
The following commit(s) were added to refs/heads/feature/notificationSupport by this push: new f5ec9ac implemented some changes to subscription mechanism based on ads protocol f5ec9ac is described below commit f5ec9ac67b8e1b492eaf044c5072bee48f69d007 Author: Sebastian Rühl <sru...@apache.org> AuthorDate: Thu May 24 11:00:52 2018 +0200 implemented some changes to subscription mechanism based on ads protocol --- .../plc4x/java/api/connection/PlcSubscriber.java | 19 ++- .../plc4x/java/api/messages/PlcNotification.java | 49 +++++- .../messages/specific/TypeSafePlcReadRequest.java | 2 +- .../ads/connection/AdsAbstractPlcConnection.java | 6 +- .../java/ads/connection/AdsTcpPlcConnection.java | 164 +++++++++++++++++---- 5 files changed, 200 insertions(+), 40 deletions(-) diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java index 5731654..d024fa0 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java @@ -21,6 +21,7 @@ package org.apache.plc4x.java.api.connection; import org.apache.plc4x.java.api.messages.PlcNotification; import org.apache.plc4x.java.api.model.Address; +import java.util.UUID; import java.util.function.Consumer; /** @@ -28,7 +29,21 @@ import java.util.function.Consumer; */ public interface PlcSubscriber { - void subscribe(Consumer<PlcNotification> consumer, Address address); + /** + * Subscribes a {@code consumer} to a {@code address} parsing values as {@code dataType}. + * + * @param consumer to be subscribed. + * @param address to be read. + * @param dataType to be decoded. + * @param uuid end to end reference + */ + void subscribe(Consumer<PlcNotification<?>> consumer, Address address, Class<?> dataType, UUID uuid); - void unsubscribe(Consumer<PlcNotification> consumer); + + /** + * Unsubscribes a {@code consumer}. + * + * @param consumer to be unsubscribed. + */ + void unsubscribe(Consumer<PlcNotification<?>> consumer); } diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java index 149c097..e69788e 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java @@ -20,15 +20,56 @@ package org.apache.plc4x.java.api.messages; import java.util.Date; import java.util.List; +import java.util.Objects; +import java.util.UUID; -public class PlcNotification { +public class PlcNotification<T> { - private final Date timeStamp; + protected final Date timeStamp; - private final List<Object> values; + protected final List<T> values; - public PlcNotification(Date timeStamp, List<Object> values) { + protected final UUID uuid; + + public PlcNotification(Date timeStamp, List<T> values, UUID uuid) { this.timeStamp = timeStamp; this.values = values; + this.uuid = uuid; + } + + public Date getTimeStamp() { + return timeStamp; + } + + public List<T> getValues() { + return values; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PlcNotification)) { + return false; + } + PlcNotification<?> that = (PlcNotification<?>) o; + return Objects.equals(timeStamp, that.timeStamp) && + Objects.equals(values, that.values) && + Objects.equals(uuid, that.uuid); + } + + @Override + public int hashCode() { + return Objects.hash(timeStamp, values, uuid); + } + + @Override + public String toString() { + return "PlcNotification{" + + "timeStamp=" + timeStamp + + ", values=" + values + + ", uuid=" + uuid + + '}'; } } diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/specific/TypeSafePlcReadRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/specific/TypeSafePlcReadRequest.java index a7d8baf..7cee6ad 100644 --- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/specific/TypeSafePlcReadRequest.java +++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/specific/TypeSafePlcReadRequest.java @@ -28,7 +28,7 @@ import java.util.Optional; public class TypeSafePlcReadRequest<T> extends PlcReadRequest { - private Class<T> dataType; + private final Class<T> dataType; public TypeSafePlcReadRequest(Class<T> dataType) { Objects.requireNonNull(dataType, "data type must not be null"); diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java index 37bb3f3..f572731 100644 --- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java +++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java @@ -48,8 +48,8 @@ public abstract class AdsAbstractPlcConnection extends AbstractPlcConnection imp private static final Logger LOGGER = LoggerFactory.getLogger(AdsAbstractPlcConnection.class); - private static final Configuration CONF = new SystemConfiguration(); - private static final long SYMBOL_RESOLVE_TIMEOUT = CONF.getLong("plc4x.adsconnection.symbol.resolve,timeout", 3000); + protected static final Configuration CONF = new SystemConfiguration(); + protected static final long SYMBOL_RESOLVE_TIMEOUT = CONF.getLong("plc4x.adsconnection.symbol.resolve,timeout", 3000); protected final AmsNetId targetAmsNetId; @@ -139,7 +139,7 @@ public abstract class AdsAbstractPlcConnection extends AbstractPlcConnection imp return sendFuture; } - private void mapAddresses(PlcRequest<?> request) { + protected void mapAddresses(PlcRequest<?> request) { request.getRequestItems().stream() .parallel() .map(RequestItem::getAddress) diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java index 66e58db..62b08fe 100644 --- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java +++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java @@ -22,40 +22,53 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; -import org.apache.plc4x.java.ads.api.commands.AdsDeviceNotificationRequest; -import org.apache.plc4x.java.ads.api.commands.types.AdsNotificationSample; -import org.apache.plc4x.java.ads.api.commands.types.AdsStampHeader; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.plc4x.java.ads.api.commands.*; +import org.apache.plc4x.java.ads.api.commands.types.*; import org.apache.plc4x.java.ads.api.generic.types.AmsNetId; import org.apache.plc4x.java.ads.api.generic.types.AmsPort; -import org.apache.plc4x.java.ads.api.util.ByteValue; +import org.apache.plc4x.java.ads.api.generic.types.Invoke; +import org.apache.plc4x.java.ads.model.AdsAddress; +import org.apache.plc4x.java.ads.model.SymbolicAdsAddress; import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol; import org.apache.plc4x.java.ads.protocol.Payload2TcpProtocol; import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol; +import org.apache.plc4x.java.ads.protocol.util.LittleEndianDecoder; import org.apache.plc4x.java.api.connection.PlcSubscriber; +import org.apache.plc4x.java.api.exceptions.PlcProtocolException; import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; -import org.apache.plc4x.java.api.messages.PlcNotification; +import org.apache.plc4x.java.api.messages.*; +import org.apache.plc4x.java.api.messages.items.RequestItem; import org.apache.plc4x.java.api.model.Address; import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.Inet4Address; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; -import java.util.stream.Collectors; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements PlcSubscriber { + private static final Logger LOGGER = LoggerFactory.getLogger(AdsTcpPlcConnection.class); + private static final int TCP_PORT = 48898; + private static final long ADD_DEVICE_TIMEOUT = CONF.getLong("plc4x.adsconnection.add.device,timeout", 3000); + private static final long DEL_DEVICE_TIMEOUT = CONF.getLong("plc4x.adsconnection.del.device,timeout", 3000); + private static AtomicInteger localPorts = new AtomicInteger(30000); - private final Map<Consumer<PlcNotification>, Consumer<AdsDeviceNotificationRequest>> subscriberMap = new HashMap<>(); + private final Map<Consumer<? extends PlcNotification>, Pair<Consumer<AdsDeviceNotificationRequest>, NotificationHandle>> subscriberMap = new HashMap<>(); + + private final Map<NotificationHandle, Consumer<? extends PlcNotification>> handleConsumerMap = new HashMap<>(); private AdsTcpPlcConnection(InetAddress address, AmsNetId targetAmsNetId, AmsPort targetAmsPort) { this(address, targetAmsNetId, targetAmsPort, generateAMSNetId(), generateAMSPort()); @@ -120,29 +133,120 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc } @Override - public void subscribe(Consumer<PlcNotification> consumer, Address address) { - Consumer<AdsDeviceNotificationRequest> adsDeviceNotificationRequestConsumer = adsDeviceNotificationRequest -> { - for (AdsStampHeader adsStampHeader : adsDeviceNotificationRequest.getAdsStampHeaders()) { - Date timeStamp = adsStampHeader.getTimeStamp().getAsDate(); - // TODO: where do we implement the mapping. Better move it into the ... - List<Object> values = adsStampHeader.getAdsNotificationSamples() - .stream() - .map(AdsNotificationSample::getData) - .map(ByteValue::getBytes) - .map(data -> (Object) data) - .collect(Collectors.toList()); - consumer.accept(new PlcNotification(timeStamp, values)); + public void subscribe(Consumer<PlcNotification<?>> consumer, Address address, Class<?> dataType, UUID uuid) { + Objects.requireNonNull(consumer); + Objects.requireNonNull(address); + IndexGroup indexGroup; + IndexOffset indexOffset; + if (address instanceof SymbolicAdsAddress) { + mapAddresses(new PlcRequest() { + { + requestItems.add(new RequestItem(Void.class, address) { + // Not relevant + }); + } + }); + AdsAddress adsAddress = addressMapping.get(address); + if (adsAddress == null) { + throw new PlcRuntimeException("Unresolvable address" + address); } - }; - subscriberMap.put(consumer, adsDeviceNotificationRequestConsumer); + indexGroup = IndexGroup.of(adsAddress.getIndexGroup()); + indexOffset = IndexOffset.of(adsAddress.getIndexOffset()); + } else if (address instanceof AdsAddress) { + AdsAddress adsAddress = (AdsAddress) address; + indexGroup = IndexGroup.of(adsAddress.getIndexGroup()); + indexOffset = IndexOffset.of(adsAddress.getIndexOffset()); + } else { + throw new IllegalArgumentException("Unssuported address type " + address.getClass()); + } + AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = AdsAddDeviceNotificationRequest.of( + targetAmsNetId, + targetAmsPort, + sourceAmsNetId, + sourceAmsPort, + Invoke.NONE, + indexGroup, + indexOffset, + Length.of(1), + TransmissionMode.of(3), + MaxDelay.of(0), + CycleTime.of(4000000) + ); + + CompletableFuture<PlcProprietaryResponse<AdsAddDeviceNotificationResponse>> addDeviceFuture = new CompletableFuture<>(); + channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(adsAddDeviceNotificationRequest), addDeviceFuture)); + PlcProprietaryResponse<AdsAddDeviceNotificationResponse> addDeviceResponse; + try { + addDeviceResponse = addDeviceFuture.get(ADD_DEVICE_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOGGER.warn("Interrupted!", e); + Thread.currentThread().interrupt(); + throw new PlcRuntimeException(e); + } catch (ExecutionException | TimeoutException e) { + throw new PlcRuntimeException(e); + } + AdsAddDeviceNotificationResponse response = addDeviceResponse.getResponse(); + if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) { + throw new PlcRuntimeException("Non error code received " + response.getResult()); + } + NotificationHandle notificationHandle = response.getNotificationHandle(); + handleConsumerMap.put(notificationHandle, consumer); + + Consumer<AdsDeviceNotificationRequest> adsDeviceNotificationRequestConsumer = + adsDeviceNotificationRequest -> adsDeviceNotificationRequest.getAdsStampHeaders().forEach(adsStampHeader -> { + Date timeStamp = adsStampHeader.getTimeStamp().getAsDate(); + + adsStampHeader.getAdsNotificationSamples() + .forEach(adsNotificationSample -> { + Consumer<? extends PlcNotification> plcNotificationConsumer = handleConsumerMap.get(adsNotificationSample.getNotificationHandle()); + if (plcNotificationConsumer == null) { + LOGGER.warn("Unmapped notification received ", adsNotificationSample.getNotificationHandle()); + return; + } + Data data = adsNotificationSample.getData(); + try { + consumer.accept(new PlcNotification<>(timeStamp, LittleEndianDecoder.decodeData(dataType, data.getBytes()), uuid)); + } catch (PlcProtocolException e) { + LOGGER.error("Can't decode {}", data, e); + } + }); + }); + subscriberMap.put(consumer, Pair.of(adsDeviceNotificationRequestConsumer, notificationHandle)); getChannel().pipeline().get(Plc4x2AdsProtocol.class).addConsumer(adsDeviceNotificationRequestConsumer); } @Override - public void unsubscribe(Consumer<PlcNotification> consumer) { - Consumer<AdsDeviceNotificationRequest> adsDeviceNotificationRequestConsumer = subscriberMap.remove(consumer); - if (adsDeviceNotificationRequestConsumer != null) { - getChannel().pipeline().get(Plc4x2AdsProtocol.class).removeConsumer(adsDeviceNotificationRequestConsumer); + public void unsubscribe(Consumer<PlcNotification<?>> consumer) { + Pair<Consumer<AdsDeviceNotificationRequest>, NotificationHandle> handlePair = subscriberMap.remove(consumer); + if (handlePair != null) { + NotificationHandle notificationHandle = handlePair.getRight(); + AdsDeleteDeviceNotificationRequest adsDeleteDeviceNotificationRequest = AdsDeleteDeviceNotificationRequest.of( + targetAmsNetId, + targetAmsPort, + sourceAmsNetId, + sourceAmsPort, + Invoke.NONE, + notificationHandle + ); + CompletableFuture<PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse>> deleteDeviceFuture = new CompletableFuture<>(); + channel.writeAndFlush(new PlcRequestContainer<>(new PlcProprietaryRequest<>(adsDeleteDeviceNotificationRequest), deleteDeviceFuture)); + + PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse> deleteDeviceResponse; + try { + deleteDeviceResponse = deleteDeviceFuture.get(DEL_DEVICE_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOGGER.warn("Interrupted!", e); + Thread.currentThread().interrupt(); + throw new PlcRuntimeException(e); + } catch (ExecutionException | TimeoutException e) { + throw new PlcRuntimeException(e); + } + AdsDeleteDeviceNotificationResponse response = deleteDeviceResponse.getResponse(); + if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) { + throw new PlcRuntimeException("Non error code received " + response.getResult()); + } + + getChannel().pipeline().get(Plc4x2AdsProtocol.class).removeConsumer(handlePair.getLeft()); } } } -- To stop receiving notification emails like this one, please contact sru...@apache.org.