This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch feature/notificationSupport in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 8de664f44c454634b993e736528311333f0e51e3 Merge: 365fabb 6e91cd7 Author: Sebastian Rühl <sru...@apache.org> AuthorDate: Thu May 24 09:40:59 2018 +0200 Merge branch 'master' into feature/notificationSupport # Conflicts: # plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java examples/iot-factory/README.adoc | 66 ++++++ examples/{kafka-bridge => iot-factory}/pom.xml | 59 +++--- .../iotfactory/IotElasticsearchFactory.java | 224 +++++++++++++++++++++ examples/iot-factory/src/main/resources/log4j2.xml | 35 ++++ .../iot-factory/src/main/resources/logback.xml | 40 ++++ examples/kafka-bridge/pom.xml | 4 + .../java/examples/kafkabridge/KafkaBridge.java | 67 ++++-- .../examples/kafkabridge/model/Configuration.java | 9 + .../model/{Address.java => PlcAddress.java} | 11 +- .../java/examples/kafkabridge/model/PlcConfig.java | 23 ++- .../model/{Address.java => PlcMemoryBlock.java} | 21 +- examples/pom.xml | 18 ++ .../java/ads/api/commands/types/IndexOffset.java | 2 + .../ads/connection/AdsAbstractPlcConnection.java | 90 +++++---- .../java/ads/connection/AdsTcpPlcConnection.java | 5 +- .../java/ads/protocol/Payload2SerialProtocol.java | 4 +- .../ads/protocol/util/LittleEndianDecoder.java | 50 +++-- .../ads/adslib/ADSClientNotificationExample.java | 6 +- .../java/ads/protocol/AbstractProtocolTest.java | 6 +- .../java/ads/protocol/ADSProtocolBenchmark.java | 8 +- .../org/apache/plc4x/java/s7/S7PlcScanner.java | 2 +- .../org/apache/plc4x/java/s7/S7PlcTestConsole.java | 2 +- src/site/asciidoc/developers/vpn.adoc | 33 +++ .../resources/img/plc4x-vpn-beckhoff-route-1.png | Bin 0 -> 99036 bytes .../resources/img/plc4x-vpn-beckhoff-route-2.png | Bin 0 -> 87524 bytes .../resources/img/plc4x-vpn-beckhoff-route-3.png | Bin 0 -> 185843 bytes 26 files changed, 622 insertions(+), 163 deletions(-) diff --cc plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java index 31895b1,825fed1..66e58db --- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java +++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java @@@ -40,20 -33,15 +40,23 @@@ import org.apache.plc4x.java.base.conne import java.net.Inet4Address; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Collectors; + import java.util.concurrent.atomic.AtomicInteger; -public class AdsTcpPlcConnection extends AdsAbstractPlcConnection { +public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements PlcSubscriber { private static final int TCP_PORT = 48898; + private static AtomicInteger localPorts = new AtomicInteger(30000); + + private final Map<Consumer<PlcNotification>, Consumer<AdsDeviceNotificationRequest>> subscriberMap = new HashMap<>(); + private AdsTcpPlcConnection(InetAddress address, AmsNetId targetAmsNetId, AmsPort targetAmsPort) { this(address, targetAmsNetId, targetAmsPort, generateAMSNetId(), generateAMSPort()); } @@@ -113,33 -101,7 +116,33 @@@ } protected static AmsPort generateAMSPort() { - return AmsPort.of(TCP_PORT); + return AmsPort.of(localPorts.getAndIncrement()); } + @Override + public void subscribe(Consumer<PlcNotification> consumer, Address address) { + Consumer<AdsDeviceNotificationRequest> adsDeviceNotificationRequestConsumer = adsDeviceNotificationRequest -> { + for (AdsStampHeader adsStampHeader : adsDeviceNotificationRequest.getAdsStampHeaders()) { + Date timeStamp = adsStampHeader.getTimeStamp().getAsDate(); + // TODO: where do we implement the mapping. Better move it into the ... + List<Object> values = adsStampHeader.getAdsNotificationSamples() + .stream() + .map(AdsNotificationSample::getData) + .map(ByteValue::getBytes) + .map(data -> (Object) data) + .collect(Collectors.toList()); + consumer.accept(new PlcNotification(timeStamp, values)); + } + }; + subscriberMap.put(consumer, adsDeviceNotificationRequestConsumer); + getChannel().pipeline().get(Plc4x2AdsProtocol.class).addConsumer(adsDeviceNotificationRequestConsumer); + } + + @Override + public void unsubscribe(Consumer<PlcNotification> consumer) { + Consumer<AdsDeviceNotificationRequest> adsDeviceNotificationRequestConsumer = subscriberMap.remove(consumer); + if (adsDeviceNotificationRequestConsumer != null) { + getChannel().pipeline().get(Plc4x2AdsProtocol.class).removeConsumer(adsDeviceNotificationRequestConsumer); + } + } } -- To stop receiving notification emails like this one, please contact sru...@apache.org.