This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch feature/notificationSupport
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 8de664f44c454634b993e736528311333f0e51e3
Merge: 365fabb 6e91cd7
Author: Sebastian Rühl <sru...@apache.org>
AuthorDate: Thu May 24 09:40:59 2018 +0200

    Merge branch 'master' into feature/notificationSupport
    
    # Conflicts:
    #   
plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java

 examples/iot-factory/README.adoc                   |  66 ++++++
 examples/{kafka-bridge => iot-factory}/pom.xml     |  59 +++---
 .../iotfactory/IotElasticsearchFactory.java        | 224 +++++++++++++++++++++
 examples/iot-factory/src/main/resources/log4j2.xml |  35 ++++
 .../iot-factory/src/main/resources/logback.xml     |  40 ++++
 examples/kafka-bridge/pom.xml                      |   4 +
 .../java/examples/kafkabridge/KafkaBridge.java     |  67 ++++--
 .../examples/kafkabridge/model/Configuration.java  |   9 +
 .../model/{Address.java => PlcAddress.java}        |  11 +-
 .../java/examples/kafkabridge/model/PlcConfig.java |  23 ++-
 .../model/{Address.java => PlcMemoryBlock.java}    |  21 +-
 examples/pom.xml                                   |  18 ++
 .../java/ads/api/commands/types/IndexOffset.java   |   2 +
 .../ads/connection/AdsAbstractPlcConnection.java   |  90 +++++----
 .../java/ads/connection/AdsTcpPlcConnection.java   |   5 +-
 .../java/ads/protocol/Payload2SerialProtocol.java  |   4 +-
 .../ads/protocol/util/LittleEndianDecoder.java     |  50 +++--
 .../ads/adslib/ADSClientNotificationExample.java   |   6 +-
 .../java/ads/protocol/AbstractProtocolTest.java    |   6 +-
 .../java/ads/protocol/ADSProtocolBenchmark.java    |   8 +-
 .../org/apache/plc4x/java/s7/S7PlcScanner.java     |   2 +-
 .../org/apache/plc4x/java/s7/S7PlcTestConsole.java |   2 +-
 src/site/asciidoc/developers/vpn.adoc              |  33 +++
 .../resources/img/plc4x-vpn-beckhoff-route-1.png   | Bin 0 -> 99036 bytes
 .../resources/img/plc4x-vpn-beckhoff-route-2.png   | Bin 0 -> 87524 bytes
 .../resources/img/plc4x-vpn-beckhoff-route-3.png   | Bin 0 -> 185843 bytes
 26 files changed, 622 insertions(+), 163 deletions(-)

diff --cc 
plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
index 31895b1,825fed1..66e58db
--- 
a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
+++ 
b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsTcpPlcConnection.java
@@@ -40,20 -33,15 +40,23 @@@ import org.apache.plc4x.java.base.conne
  import java.net.Inet4Address;
  import java.net.InetAddress;
  import java.net.UnknownHostException;
 +import java.util.Date;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
  import java.util.concurrent.CompletableFuture;
 +import java.util.function.Consumer;
 +import java.util.stream.Collectors;
+ import java.util.concurrent.atomic.AtomicInteger;
  
 -public class AdsTcpPlcConnection extends AdsAbstractPlcConnection {
 +public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements 
PlcSubscriber {
  
      private static final int TCP_PORT = 48898;
  
+     private static AtomicInteger localPorts = new AtomicInteger(30000);
+ 
 +    private final Map<Consumer<PlcNotification>, 
Consumer<AdsDeviceNotificationRequest>> subscriberMap = new HashMap<>();
 +
      private AdsTcpPlcConnection(InetAddress address, AmsNetId targetAmsNetId, 
AmsPort targetAmsPort) {
          this(address, targetAmsNetId, targetAmsPort, generateAMSNetId(), 
generateAMSPort());
      }
@@@ -113,33 -101,7 +116,33 @@@
      }
  
      protected static AmsPort generateAMSPort() {
-         return AmsPort.of(TCP_PORT);
+         return AmsPort.of(localPorts.getAndIncrement());
      }
  
 +    @Override
 +    public void subscribe(Consumer<PlcNotification> consumer, Address 
address) {
 +        Consumer<AdsDeviceNotificationRequest> 
adsDeviceNotificationRequestConsumer = adsDeviceNotificationRequest -> {
 +            for (AdsStampHeader adsStampHeader : 
adsDeviceNotificationRequest.getAdsStampHeaders()) {
 +                Date timeStamp = adsStampHeader.getTimeStamp().getAsDate();
 +                // TODO: where do we implement the mapping. Better move it 
into the ...
 +                List<Object> values = 
adsStampHeader.getAdsNotificationSamples()
 +                    .stream()
 +                    .map(AdsNotificationSample::getData)
 +                    .map(ByteValue::getBytes)
 +                    .map(data -> (Object) data)
 +                    .collect(Collectors.toList());
 +                consumer.accept(new PlcNotification(timeStamp, values));
 +            }
 +        };
 +        subscriberMap.put(consumer, adsDeviceNotificationRequestConsumer);
 +        
getChannel().pipeline().get(Plc4x2AdsProtocol.class).addConsumer(adsDeviceNotificationRequestConsumer);
 +    }
 +
 +    @Override
 +    public void unsubscribe(Consumer<PlcNotification> consumer) {
 +        Consumer<AdsDeviceNotificationRequest> 
adsDeviceNotificationRequestConsumer = subscriberMap.remove(consumer);
 +        if (adsDeviceNotificationRequestConsumer != null) {
 +            
getChannel().pipeline().get(Plc4x2AdsProtocol.class).removeConsumer(adsDeviceNotificationRequestConsumer);
 +        }
 +    }
  }

-- 
To stop receiving notification emails like this one, please contact
sru...@apache.org.

Reply via email to