This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch feature/notificationSupport
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 3a3703dc61809be6fbe150cc38fb75985c7d79ac
Author: Sebastian Rühl <sru...@apache.org>
AuthorDate: Mon May 14 16:38:16 2018 +0200

    first draft implementing notification support
---
 .../plc4x/java/api/connection/PlcConnection.java   |  4 +-
 .../plc4x/java/api/connection/PlcSubscriber.java   | 34 +++++++++++++++++
 .../plc4x/java/api/messages/PlcNotification.java   | 34 +++++++++++++++++
 .../java/ads/connection/AdsTcpPlcConnection.java   | 43 +++++++++++++++++++++-
 .../plc4x/java/ads/protocol/Plc4x2AdsProtocol.java |  1 +
 .../base/connection/AbstractPlcConnection.java     | 14 ++++---
 6 files changed, 123 insertions(+), 7 deletions(-)

diff --git 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java
 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java
index 47d3ff4..624f409 100644
--- 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java
+++ 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcConnection.java
@@ -28,7 +28,7 @@ import java.util.Optional;
  * Interface defining the most basic methods a PLC4X connection should support.
  * This generally handles the connection establishment itself and the parsing 
of
  * address strings to the platform dependent Address instances.
- *
+ * <p>
  * The individual operations are then defined by other interfaces within this 
package.
  */
 public interface PlcConnection extends AutoCloseable {
@@ -70,4 +70,6 @@ public interface PlcConnection extends AutoCloseable {
 
     Optional<PlcWriter> getWriter();
 
+    Optional<PlcSubscriber> getSubscriber();
+
 }
diff --git 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
new file mode 100644
index 0000000..5731654
--- /dev/null
+++ 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/connection/PlcSubscriber.java
@@ -0,0 +1,34 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.api.connection;
+
+import org.apache.plc4x.java.api.messages.PlcNotification;
+import org.apache.plc4x.java.api.model.Address;
+
+import java.util.function.Consumer;
+
+/**
+ * Interface implemented by all PlcConnections that are able to receive 
notifications from remote resources.
+ */
+public interface PlcSubscriber {
+
+    void subscribe(Consumer<PlcNotification> consumer, Address address);
+
+    void unsubscribe(Consumer<PlcNotification> consumer);
+}
diff --git 
a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java
 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java
new file mode 100644
index 0000000..149c097
--- /dev/null
+++ 
b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcNotification.java
@@ -0,0 +1,34 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.api.messages;
+
+import java.util.Date;
+import java.util.List;
+
+public class PlcNotification {
+
+    private final Date timeStamp;
+
+    private final List<Object> values;
+
+    public PlcNotification(Date timeStamp, List<Object> values) {
+        this.timeStamp = timeStamp;
+        this.values = values;
+    }
+}
diff --git 
a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
 
b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 505ab63..ddf2a6b 100644
--- 
a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ 
b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@ -22,23 +22,38 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
+import org.apache.plc4x.java.ads.api.commands.AdsDeviceNotificationRequest;
+import org.apache.plc4x.java.ads.api.commands.types.AdsNotificationSample;
+import org.apache.plc4x.java.ads.api.commands.types.AdsStampHeader;
 import org.apache.plc4x.java.ads.api.generic.types.AmsNetId;
 import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
+import org.apache.plc4x.java.ads.api.util.ByteValue;
 import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol;
 import org.apache.plc4x.java.ads.protocol.Payload2TcpProtocol;
 import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
+import org.apache.plc4x.java.api.connection.PlcSubscriber;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.messages.PlcNotification;
+import org.apache.plc4x.java.api.model.Address;
 import org.apache.plc4x.java.base.connection.TcpSocketChannelFactory;
 
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
-public class AdsTcpPlcConnection extends AdsAbstractPlcConnection {
+public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements 
PlcSubscriber {
 
     private static final int TCP_PORT = 48898;
 
+    private final Map<Consumer<PlcNotification>, 
Consumer<AdsDeviceNotificationRequest>> subscriberMap = new HashMap<>();
+
     private AdsTcpPlcConnection(InetAddress address, AmsNetId targetAmsNetId, 
AmsPort targetAmsPort) {
         this(address, targetAmsNetId, targetAmsPort, generateAMSNetId(), 
generateAMSPort());
     }
@@ -101,4 +116,30 @@ public class AdsTcpPlcConnection extends 
AdsAbstractPlcConnection {
         return AmsPort.of(TCP_PORT);
     }
 
+    @Override
+    public void subscribe(Consumer<PlcNotification> consumer, Address address) 
{
+        Consumer<AdsDeviceNotificationRequest> 
adsDeviceNotificationRequestConsumer = adsDeviceNotificationRequest -> {
+            for (AdsStampHeader adsStampHeader : 
adsDeviceNotificationRequest.getAdsStampHeaders()) {
+                Date timeStamp = adsStampHeader.getTimeStamp().getAsDate();
+                // TODO: where do we implement the mapping. Better move it 
into the ...
+                List<Object> values = 
adsStampHeader.getAdsNotificationSamples()
+                    .stream()
+                    .map(AdsNotificationSample::getData)
+                    .map(ByteValue::getBytes)
+                    .map(data -> (Object) data)
+                    .collect(Collectors.toList());
+                consumer.accept(new PlcNotification(timeStamp, values));
+            }
+        };
+        subscriberMap.put(consumer, adsDeviceNotificationRequestConsumer);
+        
getChannel().pipeline().get(Plc4x2AdsProtocol.class).addConsumer(adsDeviceNotificationRequestConsumer);
+    }
+
+    @Override
+    public void unsubscribe(Consumer<PlcNotification> consumer) {
+        Consumer<AdsDeviceNotificationRequest> 
adsDeviceNotificationRequestConsumer = subscriberMap.remove(consumer);
+        if (adsDeviceNotificationRequestConsumer != null) {
+            
getChannel().pipeline().get(Plc4x2AdsProtocol.class).removeConsumer(adsDeviceNotificationRequestConsumer);
+        }
+    }
 }
diff --git 
a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
 
b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
index c11c819..111acaf 100644
--- 
a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
+++ 
b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/Plc4x2AdsProtocol.java
@@ -214,6 +214,7 @@ public class Plc4x2AdsProtocol extends 
MessageToMessageCodec<AmsPacket, PlcReque
     }
 
     public boolean addConsumer(Consumer<AdsDeviceNotificationRequest> 
adsDeviceNotificationRequestConsumer) {
+        // TODO: we might need to add an AdsAddDeviceNotification
         return 
deviceNotificationListeners.add(adsDeviceNotificationRequestConsumer);
     }
 
diff --git 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java
 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java
index 0af87d6..83f1540 100644
--- 
a/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java
+++ 
b/plc4j/protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/connection/AbstractPlcConnection.java
@@ -20,10 +20,7 @@ package org.apache.plc4x.java.base.connection;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcLister;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.connection.PlcWriter;
+import org.apache.plc4x.java.api.connection.*;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcIoException;
 
@@ -59,7 +56,7 @@ public abstract class AbstractPlcConnection implements 
PlcConnection {
             // Have the channel factory create a new channel instance.
             channel = 
channelFactory.createChannel(getChannelHandler(sessionSetupCompleteFuture));
             channel.closeFuture().addListener(future -> {
-                if(!sessionSetupCompleteFuture.isDone()) {
+                if (!sessionSetupCompleteFuture.isDone()) {
                     sessionSetupCompleteFuture.completeExceptionally(
                         new PlcIoException("Connection terminated by remote"));
                 }
@@ -127,4 +124,11 @@ public abstract class AbstractPlcConnection implements 
PlcConnection {
         return Optional.empty();
     }
 
+    @Override
+    public Optional<PlcSubscriber> getSubscriber() {
+        if (this instanceof PlcSubscriber) {
+            return Optional.of((PlcSubscriber) this);
+        }
+        return Optional.empty();
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
sru...@apache.org.

Reply via email to