This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch feature/notificationSupport
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git


The following commit(s) were added to refs/heads/feature/notificationSupport by 
this push:
     new f5ec9ac  implemented some changes to subscription mechanism based on 
ads protocol
f5ec9ac is described below

commit f5ec9ac67b8e1b492eaf044c5072bee48f69d007
Author: Sebastian Rühl <sru...@apache.org>
AuthorDate: Thu May 24 11:00:52 2018 +0200

    implemented some changes to subscription mechanism based on ads protocol
---
 .../plc4x/java/api/connection/PlcSubscriber.java   |  19 ++-
 .../plc4x/java/api/messages/PlcNotification.java   |  49 +++++-
 .../messages/specific/TypeSafePlcReadRequest.java  |   2 +-
 .../ads/connection/AdsAbstractPlcConnection.java   |   6 +-
 .../java/ads/connection/AdsTcpPlcConnection.java   | 164 +++++++++++++++++----
 5 files changed, 200 insertions(+), 40 deletions(-)

diff --git 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
index 5731654..d024fa0 100644
--- 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
+++ 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
@@ -21,6 +21,7 @@ package org.apache.plc4x.java.api.connection;
 import org.apache.plc4x.java.api.messages.PlcNotification;
 import org.apache.plc4x.java.api.model.Address;
 
+import java.util.UUID;
 import java.util.function.Consumer;
 
 /**
@@ -28,7 +29,21 @@ import java.util.function.Consumer;
  */
 public interface PlcSubscriber {
 
-    void subscribe(Consumer<PlcNotification> consumer, Address address);
+    /**
+     * Subscribes a {@code consumer} to a {@code address} parsing values as 
{@code dataType}.
+     *
+     * @param consumer to be subscribed.
+     * @param address  to be read.
+     * @param dataType to be decoded.
+     * @param uuid     end to end reference
+     */
+    void subscribe(Consumer<PlcNotification<?>> consumer, Address address, 
Class<?> dataType, UUID uuid);
 
-    void unsubscribe(Consumer<PlcNotification> consumer);
+
+    /**
+     * Unsubscribes a {@code consumer}.
+     *
+     * @param consumer to be unsubscribed.
+     */
+    void unsubscribe(Consumer<PlcNotification<?>> consumer);
 }
diff --git 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java
 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java
index 149c097..e69788e 100644
--- 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java
+++ 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java
@@ -20,15 +20,56 @@ package org.apache.plc4x.java.api.messages;
 
 import java.util.Date;
 import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
 
-public class PlcNotification {
+public class PlcNotification<T> {
 
-    private final Date timeStamp;
+    protected final Date timeStamp;
 
-    private final List<Object> values;
+    protected final List<T> values;
 
-    public PlcNotification(Date timeStamp, List<Object> values) {
+    protected final UUID uuid;
+
+    public PlcNotification(Date timeStamp, List<T> values, UUID uuid) {
         this.timeStamp = timeStamp;
         this.values = values;
+        this.uuid = uuid;
+    }
+
+    public Date getTimeStamp() {
+        return timeStamp;
+    }
+
+    public List<T> getValues() {
+        return values;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof PlcNotification)) {
+            return false;
+        }
+        PlcNotification<?> that = (PlcNotification<?>) o;
+        return Objects.equals(timeStamp, that.timeStamp) &&
+            Objects.equals(values, that.values) &&
+            Objects.equals(uuid, that.uuid);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(timeStamp, values, uuid);
+    }
+
+    @Override
+    public String toString() {
+        return "PlcNotification{" +
+            "timeStamp=" + timeStamp +
+            ", values=" + values +
+            ", uuid=" + uuid +
+            '}';
     }
 }
diff --git 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/specific/TypeSafePlcReadRequest.java
 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/specific/TypeSafePlcReadRequest.java
index a7d8baf..7cee6ad 100644
--- 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/specific/TypeSafePlcReadRequest.java
+++ 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/specific/TypeSafePlcReadRequest.java
@@ -28,7 +28,7 @@ import java.util.Optional;
 
 public class TypeSafePlcReadRequest<T> extends PlcReadRequest {
 
-    private Class<T> dataType;
+    private final Class<T> dataType;
 
     public TypeSafePlcReadRequest(Class<T> dataType) {
         Objects.requireNonNull(dataType, "data type must not be null");
diff --git 
a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
 
b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
index 37bb3f3..f572731 100644
--- 
a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
+++ 
b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsAbstractPlcConnection.java
@@ -48,8 +48,8 @@ public abstract class AdsAbstractPlcConnection extends 
AbstractPlcConnection imp
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AdsAbstractPlcConnection.class);
 
-    private static final Configuration CONF = new SystemConfiguration();
-    private static final long SYMBOL_RESOLVE_TIMEOUT = 
CONF.getLong("plc4x.adsconnection.symbol.resolve,timeout", 3000);
+    protected static final Configuration CONF = new SystemConfiguration();
+    protected static final long SYMBOL_RESOLVE_TIMEOUT = 
CONF.getLong("plc4x.adsconnection.symbol.resolve,timeout", 3000);
 
     protected final AmsNetId targetAmsNetId;
 
@@ -139,7 +139,7 @@ public abstract class AdsAbstractPlcConnection extends 
AbstractPlcConnection imp
         return sendFuture;
     }
 
-    private void mapAddresses(PlcRequest<?> request) {
+    protected void mapAddresses(PlcRequest<?> request) {
         request.getRequestItems().stream()
             .parallel()
             .map(RequestItem::getAddress)
diff --git 
a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
 
b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 66e58db..62b08fe 100644
--- 
a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ 
b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -22,40 +22,53 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
-import org.apache.plc4x.java.ads.api.commands.AdsDeviceNotificationRequest;
-import org.apache.plc4x.java.ads.api.commands.types.AdsNotificationSample;
-import org.apache.plc4x.java.ads.api.commands.types.AdsStampHeader;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.ads.api.commands.*;
+import org.apache.plc4x.java.ads.api.commands.types.*;
 import org.apache.plc4x.java.ads.api.generic.types.AmsNetId;
 import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
-import org.apache.plc4x.java.ads.api.util.ByteValue;
+import org.apache.plc4x.java.ads.api.generic.types.Invoke;
+import org.apache.plc4x.java.ads.model.AdsAddress;
+import org.apache.plc4x.java.ads.model.SymbolicAdsAddress;
 import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol;
 import org.apache.plc4x.java.ads.protocol.Payload2TcpProtocol;
 import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
+import org.apache.plc4x.java.ads.protocol.util.LittleEndianDecoder;
 import org.apache.plc4x.java.api.connection.PlcSubscriber;
+import org.apache.plc4x.java.api.exceptions.PlcProtocolException;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
-import org.apache.plc4x.java.api.messages.PlcNotification;
+import org.apache.plc4x.java.api.messages.*;
+import org.apache.plc4x.java.api.messages.items.RequestItem;
 import org.apache.plc4x.java.api.model.Address;
 import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements 
PlcSubscriber {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AdsTcpPlcConnection.class);
+
     private static final int TCP_PORT = 48898;
 
+    private static final long ADD_DEVICE_TIMEOUT = 
CONF.getLong("plc4x.adsconnection.add.device,timeout", 3000);
+    private static final long DEL_DEVICE_TIMEOUT = 
CONF.getLong("plc4x.adsconnection.del.device,timeout", 3000);
+
     private static AtomicInteger localPorts = new AtomicInteger(30000);
 
-    private final Map<Consumer<PlcNotification>, 
Consumer<AdsDeviceNotificationRequest>> subscriberMap = new HashMap<>();
+    private final Map<Consumer<? extends PlcNotification>, 
Pair<Consumer<AdsDeviceNotificationRequest>, NotificationHandle>> subscriberMap 
= new HashMap<>();
+
+    private final Map<NotificationHandle, Consumer<? extends PlcNotification>> 
handleConsumerMap = new HashMap<>();
 
     private AdsTcpPlcConnection(InetAddress address, AmsNetId targetAmsNetId, 
AmsPort targetAmsPort) {
         this(address, targetAmsNetId, targetAmsPort, generateAMSNetId(), 
generateAMSPort());
@@ -120,29 +133,120 @@ public class AdsTcpPlcConnection extends 
AdsAbstractPlcConnection implements Plc
     }
 
     @Override
-    public void subscribe(Consumer<PlcNotification> consumer, Address address) 
{
-        Consumer<AdsDeviceNotificationRequest> 
adsDeviceNotificationRequestConsumer = adsDeviceNotificationRequest -> {
-            for (AdsStampHeader adsStampHeader : 
adsDeviceNotificationRequest.getAdsStampHeaders()) {
-                Date timeStamp = adsStampHeader.getTimeStamp().getAsDate();
-                // TODO: where do we implement the mapping. Better move it 
into the ...
-                List<Object> values = 
adsStampHeader.getAdsNotificationSamples()
-                    .stream()
-                    .map(AdsNotificationSample::getData)
-                    .map(ByteValue::getBytes)
-                    .map(data -> (Object) data)
-                    .collect(Collectors.toList());
-                consumer.accept(new PlcNotification(timeStamp, values));
+    public void subscribe(Consumer<PlcNotification<?>> consumer, Address 
address, Class<?> dataType, UUID uuid) {
+        Objects.requireNonNull(consumer);
+        Objects.requireNonNull(address);
+        IndexGroup indexGroup;
+        IndexOffset indexOffset;
+        if (address instanceof SymbolicAdsAddress) {
+            mapAddresses(new PlcRequest() {
+                {
+                    requestItems.add(new RequestItem(Void.class, address) {
+                        // Not relevant
+                    });
+                }
+            });
+            AdsAddress adsAddress = addressMapping.get(address);
+            if (adsAddress == null) {
+                throw new PlcRuntimeException("Unresolvable address" + 
address);
             }
-        };
-        subscriberMap.put(consumer, adsDeviceNotificationRequestConsumer);
+            indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
+            indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
+        } else if (address instanceof AdsAddress) {
+            AdsAddress adsAddress = (AdsAddress) address;
+            indexGroup = IndexGroup.of(adsAddress.getIndexGroup());
+            indexOffset = IndexOffset.of(adsAddress.getIndexOffset());
+        } else {
+            throw new IllegalArgumentException("Unssuported address type " + 
address.getClass());
+        }
+        AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = 
AdsAddDeviceNotificationRequest.of(
+            targetAmsNetId,
+            targetAmsPort,
+            sourceAmsNetId,
+            sourceAmsPort,
+            Invoke.NONE,
+            indexGroup,
+            indexOffset,
+            Length.of(1),
+            TransmissionMode.of(3),
+            MaxDelay.of(0),
+            CycleTime.of(4000000)
+        );
+
+        
CompletableFuture<PlcProprietaryResponse<AdsAddDeviceNotificationResponse>> 
addDeviceFuture = new CompletableFuture<>();
+        channel.writeAndFlush(new PlcRequestContainer<>(new 
PlcProprietaryRequest<>(adsAddDeviceNotificationRequest), addDeviceFuture));
+        PlcProprietaryResponse<AdsAddDeviceNotificationResponse> 
addDeviceResponse;
+        try {
+            addDeviceResponse = addDeviceFuture.get(ADD_DEVICE_TIMEOUT, 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.warn("Interrupted!", e);
+            Thread.currentThread().interrupt();
+            throw new PlcRuntimeException(e);
+        } catch (ExecutionException | TimeoutException e) {
+            throw new PlcRuntimeException(e);
+        }
+        AdsAddDeviceNotificationResponse response = 
addDeviceResponse.getResponse();
+        if (response.getResult().toAdsReturnCode() != 
AdsReturnCode.ADS_CODE_0) {
+            throw new PlcRuntimeException("Non error code received " + 
response.getResult());
+        }
+        NotificationHandle notificationHandle = 
response.getNotificationHandle();
+        handleConsumerMap.put(notificationHandle, consumer);
+
+        Consumer<AdsDeviceNotificationRequest> 
adsDeviceNotificationRequestConsumer =
+            adsDeviceNotificationRequest -> 
adsDeviceNotificationRequest.getAdsStampHeaders().forEach(adsStampHeader -> {
+                Date timeStamp = adsStampHeader.getTimeStamp().getAsDate();
+
+                adsStampHeader.getAdsNotificationSamples()
+                    .forEach(adsNotificationSample -> {
+                        Consumer<? extends PlcNotification> 
plcNotificationConsumer = 
handleConsumerMap.get(adsNotificationSample.getNotificationHandle());
+                        if (plcNotificationConsumer == null) {
+                            LOGGER.warn("Unmapped notification received ", 
adsNotificationSample.getNotificationHandle());
+                            return;
+                        }
+                        Data data = adsNotificationSample.getData();
+                        try {
+                            consumer.accept(new PlcNotification<>(timeStamp, 
LittleEndianDecoder.decodeData(dataType, data.getBytes()), uuid));
+                        } catch (PlcProtocolException e) {
+                            LOGGER.error("Can't decode {}", data, e);
+                        }
+                    });
+            });
+        subscriberMap.put(consumer, 
Pair.of(adsDeviceNotificationRequestConsumer, notificationHandle));
         
getChannel().pipeline().get(Plc4x2AdsProtocol.class).addConsumer(adsDeviceNotificationRequestConsumer);
     }
 
     @Override
-    public void unsubscribe(Consumer<PlcNotification> consumer) {
-        Consumer<AdsDeviceNotificationRequest> 
adsDeviceNotificationRequestConsumer = subscriberMap.remove(consumer);
-        if (adsDeviceNotificationRequestConsumer != null) {
-            
getChannel().pipeline().get(Plc4x2AdsProtocol.class).removeConsumer(adsDeviceNotificationRequestConsumer);
+    public void unsubscribe(Consumer<PlcNotification<?>> consumer) {
+        Pair<Consumer<AdsDeviceNotificationRequest>, NotificationHandle> 
handlePair = subscriberMap.remove(consumer);
+        if (handlePair != null) {
+            NotificationHandle notificationHandle = handlePair.getRight();
+            AdsDeleteDeviceNotificationRequest 
adsDeleteDeviceNotificationRequest = AdsDeleteDeviceNotificationRequest.of(
+                targetAmsNetId,
+                targetAmsPort,
+                sourceAmsNetId,
+                sourceAmsPort,
+                Invoke.NONE,
+                notificationHandle
+            );
+            
CompletableFuture<PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse>> 
deleteDeviceFuture = new CompletableFuture<>();
+            channel.writeAndFlush(new PlcRequestContainer<>(new 
PlcProprietaryRequest<>(adsDeleteDeviceNotificationRequest), 
deleteDeviceFuture));
+
+            PlcProprietaryResponse<AdsDeleteDeviceNotificationResponse> 
deleteDeviceResponse;
+            try {
+                deleteDeviceResponse = 
deleteDeviceFuture.get(DEL_DEVICE_TIMEOUT, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Interrupted!", e);
+                Thread.currentThread().interrupt();
+                throw new PlcRuntimeException(e);
+            } catch (ExecutionException | TimeoutException e) {
+                throw new PlcRuntimeException(e);
+            }
+            AdsDeleteDeviceNotificationResponse response = 
deleteDeviceResponse.getResponse();
+            if (response.getResult().toAdsReturnCode() != 
AdsReturnCode.ADS_CODE_0) {
+                throw new PlcRuntimeException("Non error code received " + 
response.getResult());
+            }
+
+            
getChannel().pipeline().get(Plc4x2AdsProtocol.class).removeConsumer(handlePair.getLeft());
         }
     }
 }

-- 
To stop receiving notification emails like this one, please contact
sru...@apache.org.

Reply via email to