This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/plc4go
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 6581c3ab65c5c2e094ff71ac2922969cd94bc01c
Author: Christofer Dutz <christofer.d...@c-ware.de>
AuthorDate: Fri Nov 20 14:08:44 2020 +0100

    - Added a map of "attributes" to the PLC4Go Connection's Metadata (Which 
allows providing protocol specific additional information)
---
 plc4go/cmd/main/drivers/knxnetip_test.go           |   6 +
 .../internal/plc4go/knxnetip/KncNetIpConnection.go | 459 -------------------
 .../internal/plc4go/knxnetip/KnxNetIpConnection.go | 488 +++++++++++++++++++++
 plc4go/internal/plc4go/modbus/ModbusConnection.go  |   5 +
 plc4go/pkg/plc4go/model/plc_connection_metadata.go |   5 +
 5 files changed, 504 insertions(+), 459 deletions(-)

diff --git a/plc4go/cmd/main/drivers/knxnetip_test.go 
b/plc4go/cmd/main/drivers/knxnetip_test.go
index aaf2268..ee996cc 100644
--- a/plc4go/cmd/main/drivers/knxnetip_test.go
+++ b/plc4go/cmd/main/drivers/knxnetip_test.go
@@ -66,6 +66,12 @@ func TestKnxNetIpPlc4goDriver(t *testing.T) {
     }
     connection := connectionResult.Connection
 
+    attributes := connection.GetMetadata().GetConnectionAttributes()
+    fmt.Printf("Successfully connected to KNXnet/IP Gateway '%s' with KNX 
address '%s' got assigned client KNX address '%s'\n",
+        attributes["GatewayName"],
+        attributes["GatewayKnxAddress"],
+        attributes["ClientKnxAddress"])
+
     // Try to ping the remote device
     pingResultChannel := connection.Ping()
     pingResult := <-pingResultChannel
diff --git a/plc4go/internal/plc4go/knxnetip/KncNetIpConnection.go 
b/plc4go/internal/plc4go/knxnetip/KncNetIpConnection.go
deleted file mode 100644
index fcdd4d0..0000000
--- a/plc4go/internal/plc4go/knxnetip/KncNetIpConnection.go
+++ /dev/null
@@ -1,459 +0,0 @@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-package knxnetip
-
-import (
-       "bytes"
-       "errors"
-       "fmt"
-       driverModel 
"github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
-       internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/model"
-       "github.com/apache/plc4x/plc4go/internal/plc4go/spi"
-       "github.com/apache/plc4x/plc4go/internal/plc4go/spi/interceptors"
-       "github.com/apache/plc4x/plc4go/internal/plc4go/transports"
-       "github.com/apache/plc4x/plc4go/internal/plc4go/transports/udp"
-       "github.com/apache/plc4x/plc4go/internal/plc4go/utils"
-       "github.com/apache/plc4x/plc4go/pkg/plc4go"
-       apiModel "github.com/apache/plc4x/plc4go/pkg/plc4go/model"
-       "net"
-       "strconv"
-       "sync"
-       "time"
-)
-
-type ConnectionMetadata struct {
-       apiModel.PlcConnectionMetadata
-}
-
-func (m ConnectionMetadata) CanRead() bool {
-       return true
-}
-
-func (m ConnectionMetadata) CanWrite() bool {
-       return true
-}
-
-func (m ConnectionMetadata) CanSubscribe() bool {
-       return true
-}
-
-type KnxNetIpConnection struct {
-       messageCodec             spi.MessageCodec
-       options                  map[string][]string
-       fieldHandler             spi.PlcFieldHandler
-       valueHandler             spi.PlcValueHandler
-       quitConnectionStateTimer chan struct{}
-       subscribers              []*KnxNetIpSubscriber
-       leve3AddressCache        map[uint16]*driverModel.KnxGroupAddress3Level
-       leve2AddressCache        map[uint16]*driverModel.KnxGroupAddress2Level
-       leve1AddressCache        
map[uint16]*driverModel.KnxGroupAddressFreeLevel
-
-       valueCache      map[uint16][]int8
-       valueCacheMutex sync.RWMutex
-
-       GatewayKnxAddress      *driverModel.KnxAddress
-       GatewayName            string
-       ClientKnxAddress       *driverModel.KnxAddress
-       CommunicationChannelId uint8
-
-       requestInterceptor internalModel.RequestInterceptor
-       plc4go.PlcConnection
-}
-
-func NewKnxNetIpConnection(messageCodec spi.MessageCodec, options 
map[string][]string, fieldHandler spi.PlcFieldHandler) *KnxNetIpConnection {
-       return &KnxNetIpConnection{
-               messageCodec:       messageCodec,
-               options:            options,
-               fieldHandler:       fieldHandler,
-               valueHandler:       NewValueHandler(),
-               requestInterceptor: 
interceptors.NewSingleItemRequestInterceptor(),
-               subscribers:        []*KnxNetIpSubscriber{},
-               leve3AddressCache:  
map[uint16]*driverModel.KnxGroupAddress3Level{},
-               leve2AddressCache:  
map[uint16]*driverModel.KnxGroupAddress2Level{},
-               leve1AddressCache:  
map[uint16]*driverModel.KnxGroupAddressFreeLevel{},
-               valueCache:         map[uint16][]int8{},
-               valueCacheMutex:    sync.RWMutex{},
-       }
-}
-
-func (m *KnxNetIpConnection) Connect() <-chan 
plc4go.PlcConnectionConnectResult {
-       ch := make(chan plc4go.PlcConnectionConnectResult)
-       go func() {
-               err := m.messageCodec.Connect()
-               if err != nil {
-                       ch <- plc4go.NewPlcConnectionConnectResult(m, err)
-                       return
-               }
-
-               transportInstanceExposer, ok := 
m.messageCodec.(spi.TransportInstanceExposer)
-               if !ok {
-                       ch <- plc4go.NewPlcConnectionConnectResult(m, 
errors.New(
-                               "used transport, is not a 
TransportInstanceExposer"))
-                       return
-               }
-
-               // Prepare a SearchReq
-               udpTransportInstance, ok := 
transportInstanceExposer.GetTransportInstance().(*udp.UdpTransportInstance)
-               if !ok {
-                       ch <- plc4go.NewPlcConnectionConnectResult(m, 
errors.New(
-                               "used transport, is not a 
UdpTransportInstance"))
-                       return
-               }
-               localAddress := 
driverModel.NewIPAddress(utils.ByteToInt8(udpTransportInstance.LocalAddress.IP))
-               discoveryEndpoint := driverModel.NewHPAIDiscoveryEndpoint(
-                       driverModel.HostProtocolCode_IPV4_UDP, localAddress, 
uint16(udpTransportInstance.LocalAddress.Port))
-               searchRequest := driverModel.NewSearchRequest(discoveryEndpoint)
-               // Send the SearchReq
-               err = m.messageCodec.Send(searchRequest)
-               if err != nil {
-                       ch <- plc4go.NewPlcConnectionConnectResult(m, 
errors.New(
-                               "error sending search request"))
-                       return
-               }
-               // Register an expected response
-               check := func(response interface{}) (bool, bool) {
-                       searchResponse := 
driverModel.CastSearchResponse(response)
-                       return searchResponse != nil, false
-               }
-
-               // Create a channel for async execution of the connection
-               connectionResult := make(chan error)
-
-               // Register a callback to handle the response
-               searchResponseChan := m.messageCodec.Expect(check)
-               go func() {
-                       select {
-                       case response := <-searchResponseChan:
-                               searchResponse := 
driverModel.CastSearchResponse(response)
-                               // Check if this device supports tunneling 
services
-                               supportsTunneling := false
-                               for _, serviceId := range 
searchResponse.DibSuppSvcFamilies.ServiceIds {
-                                       _, ok := 
serviceId.Child.(*driverModel.KnxNetIpTunneling)
-                                       if ok {
-                                               supportsTunneling = true
-                                               break
-                                       }
-                               }
-                               if supportsTunneling {
-                                       // Save some important information
-                                       m.GatewayName = 
string(bytes.Trim(utils.Int8ToByte(
-                                               
searchResponse.DibDeviceInfo.DeviceFriendlyName), "\x00"))
-                                       m.GatewayKnxAddress = 
searchResponse.DibDeviceInfo.KnxAddress
-
-                                       // As soon as we got a successful 
search-response back, send a connection request.
-                                       localAddress := 
m.castIpToKnxAddress(udpTransportInstance.LocalAddress.IP)
-                                       connectionRequest := 
driverModel.NewConnectionRequest(
-                                               
driverModel.NewHPAIDiscoveryEndpoint(driverModel.HostProtocolCode_IPV4_UDP,
-                                                       localAddress, 
uint16(udpTransportInstance.LocalAddress.Port)),
-                                               
driverModel.NewHPAIDataEndpoint(driverModel.HostProtocolCode_IPV4_UDP,
-                                                       localAddress, 
uint16(udpTransportInstance.LocalAddress.Port)),
-                                               
driverModel.NewConnectionRequestInformationTunnelConnection(driverModel.KnxLayer_TUNNEL_LINK_LAYER),
-                                       )
-
-                                       // Send the connection request
-                                       err = 
m.messageCodec.Send(connectionRequest)
-                                       if err != nil {
-                                               // TODO: Different channel ...
-                                               ch <- 
plc4go.NewPlcConnectionConnectResult(m, errors.New(
-                                                       "error sending 
connection request"))
-                                               return
-                                       }
-                                       // Register an expected response
-                                       check := func(response interface{}) 
(bool, bool) {
-                                               connectionResponse := 
driverModel.CastConnectionResponse(response)
-                                               return connectionResponse != 
nil, false
-                                       }
-
-                                       // Register a callback to handle the 
response
-                                       connectionResponseChan := 
m.messageCodec.Expect(check)
-                                       go func() {
-                                               response := 
<-connectionResponseChan
-                                               connectionResponse := 
driverModel.CastConnectionResponse(response)
-                                               // Save the communication 
channel id
-                                               m.CommunicationChannelId = 
connectionResponse.CommunicationChannelId
-                                               if connectionResponse.Status == 
driverModel.Status_NO_ERROR {
-                                                       // Register a listener 
for incoming tunneling requests
-                                                       checkTunnelReq := 
func(response interface{}) (bool, bool) {
-                                                               
tunnelingRequest := driverModel.CastTunnelingRequest(response)
-                                                               return 
(tunnelingRequest != nil) &&
-                                                                               
(tunnelingRequest.TunnelingRequestDataBlock.CommunicationChannelId == 
m.CommunicationChannelId),
-                                                                       true
-                                                       }
-                                                       tunnelingRequestChan := 
m.messageCodec.Expect(checkTunnelReq)
-                                                       go func() {
-                                                               
m.handleIncomingTunnelingRequest(tunnelingRequestChan)
-                                                       }()
-
-                                                       
tunnelConnectionDataBlock := 
connectionResponse.ConnectionResponseDataBlock.Child.(*driverModel.ConnectionResponseDataBlockTunnelConnection)
-                                                       // Save the KNX Address 
the Gateway assigned to this connection.
-                                                       m.ClientKnxAddress = 
tunnelConnectionDataBlock.KnxAddress
-
-                                                       
fmt.Printf("Successfully connected to KNXnet/IP Gateway '%s' with KNX address 
'%d.%d.%d' got assigned client KNX address '%d.%d.%d'\n",
-                                                               m.GatewayName,
-                                                               
m.GatewayKnxAddress.MainGroup, m.GatewayKnxAddress.MiddleGroup, 
m.GatewayKnxAddress.SubGroup,
-                                                               
m.ClientKnxAddress.MainGroup, m.ClientKnxAddress.MiddleGroup, 
m.ClientKnxAddress.SubGroup)
-
-                                                       // Fire the "connected" 
event
-                                                       ch <- 
plc4go.NewPlcConnectionConnectResult(m, nil)
-
-                                                       // Start a timer that 
sends connection-state requests every 60 seconds
-                                                       connectionStateTimer := 
time.NewTicker(60 * time.Second)
-                                                       
m.quitConnectionStateTimer = make(chan struct{})
-                                                       go func() {
-                                                               for {
-                                                                       select {
-                                                                       case 
<-connectionStateTimer.C:
-                                                                               
// We're using the connection-state-request as ping operation ...
-                                                                               
ping := m.Ping()
-                                                                               
pingResult := <-ping
-                                                                               
if pingResult.Err != nil {
-                                                                               
        // TODO: Do some error handling here ...
-                                                                               
        connectionStateTimer.Stop()
-                                                                               
}
-                                                                       case 
<-m.quitConnectionStateTimer:
-                                                                               
// TODO: Do some error handling here ...
-                                                                               
connectionStateTimer.Stop()
-                                                                               
return
-                                                                       }
-                                                               }
-                                                       }()
-                                               } else {
-                                                       ch <- 
plc4go.NewPlcConnectionConnectResult(m,
-                                                               errors.New("got 
a connection response with status 
"+strconv.Itoa(int(connectionResponse.Status))))
-                                               }
-                                       }()
-                               } else {
-                                       ch <- 
plc4go.NewPlcConnectionConnectResult(m,
-                                               errors.New("this connection 
doesn't support tunneling"))
-                               }
-                       case <-time.After(1 * time.Second):
-                               ch <- plc4go.NewPlcConnectionConnectResult(m, 
errors.New("received timeout"))
-                       }
-               }()
-               // Wait for the connection to be established
-               err = <-connectionResult
-               ch <- plc4go.NewPlcConnectionConnectResult(m, err)
-       }()
-       return ch
-}
-
-func (m *KnxNetIpConnection) Close() <-chan plc4go.PlcConnectionCloseResult {
-       // TODO: Implement ...
-       ch := make(chan plc4go.PlcConnectionCloseResult)
-       go func() {
-               ch <- plc4go.NewPlcConnectionCloseResult(m, nil)
-       }()
-       return ch
-}
-
-func (m *KnxNetIpConnection) IsConnected() bool {
-       panic("implement me")
-}
-
-func (m *KnxNetIpConnection) Ping() <-chan plc4go.PlcConnectionPingResult {
-       result := make(chan plc4go.PlcConnectionPingResult)
-       //      diagnosticRequestPdu := 
driverModel.NewModbusPDUDiagnosticRequest(0, 0x42)
-       go func() {
-               transportInstanceExposer, ok := 
m.messageCodec.(spi.TransportInstanceExposer)
-               if !ok {
-                       result <- plc4go.NewPlcConnectionPingResult(errors.New(
-                               "used transport, is not a 
TransportInstanceExposer"))
-                       return
-               }
-
-               // Prepare a SearchReq
-               udpTransportInstance, ok := 
transportInstanceExposer.GetTransportInstance().(*udp.UdpTransportInstance)
-               if !ok {
-                       result <- plc4go.NewPlcConnectionPingResult(errors.New(
-                               "used transport, is not a 
UdpTransportInstance"))
-                       return
-               }
-
-               localAddress := 
m.castIpToKnxAddress(udpTransportInstance.LocalAddress.IP)
-
-               connectionStateRequest := driverModel.NewConnectionStateRequest(
-                       m.CommunicationChannelId,
-                       driverModel.NewHPAIControlEndpoint(
-                               driverModel.HostProtocolCode_IPV4_UDP,
-                               localAddress, 
uint16(udpTransportInstance.LocalAddress.Port)))
-
-               // Send the connection state request
-               err := m.messageCodec.Send(connectionStateRequest)
-               if err != nil {
-                       result <- plc4go.NewPlcConnectionPingResult(err)
-                       return
-               }
-               // Register an expected response
-               check := func(response interface{}) (bool, bool) {
-                       connectionStateResponse := 
driverModel.CastConnectionStateResponse(response)
-                       return connectionStateResponse != nil, false
-               }
-
-               // Register a callback to handle the response
-               connectionStateResponseChan := m.messageCodec.Expect(check)
-               go func() {
-                       response := <-connectionStateResponseChan
-                       connectionStateResponse := 
driverModel.CastConnectionStateResponse(response)
-                       if connectionStateResponse.Status != 
driverModel.Status_NO_ERROR {
-                               result <- 
plc4go.NewPlcConnectionPingResult(errors.New(
-                                       "got a failure response code " + 
strconv.Itoa(int(connectionStateResponse.Status))))
-                       } else {
-                               result <- plc4go.NewPlcConnectionPingResult(nil)
-                       }
-               }()
-       }()
-       return result
-}
-
-func (m *KnxNetIpConnection) GetMetadata() apiModel.PlcConnectionMetadata {
-       return ConnectionMetadata{}
-}
-
-func (m *KnxNetIpConnection) ReadRequestBuilder() 
apiModel.PlcReadRequestBuilder {
-       return internalModel.NewDefaultPlcReadRequestBuilder(
-               m.fieldHandler, NewKnxNetIpReader(m))
-}
-
-func (m *KnxNetIpConnection) WriteRequestBuilder() 
apiModel.PlcWriteRequestBuilder {
-       return internalModel.NewDefaultPlcWriteRequestBuilder(
-               m.fieldHandler, m.valueHandler, 
NewKnxNetIpWriter(m.messageCodec))
-}
-
-func (m *KnxNetIpConnection) SubscriptionRequestBuilder() 
apiModel.PlcSubscriptionRequestBuilder {
-       return internalModel.NewDefaultPlcSubscriptionRequestBuilder(
-               m.fieldHandler, m.valueHandler, NewKnxNetIpSubscriber(m))
-}
-
-func (m *KnxNetIpConnection) UnsubscriptionRequestBuilder() 
apiModel.PlcUnsubscriptionRequestBuilder {
-       return nil /*internalModel.NewDefaultPlcUnsubscriptionRequestBuilder(
-         m.fieldHandler, m.valueHandler, 
NewKnxNetIpSubscriber(m.messageCodec))*/
-}
-
-func (m *KnxNetIpConnection) GetTransportInstance() 
transports.TransportInstance {
-       if mc, ok := m.messageCodec.(spi.TransportInstanceExposer); ok {
-               return mc.GetTransportInstance()
-       }
-       return nil
-}
-
-func (m *KnxNetIpConnection) GetPlcFieldHandler() spi.PlcFieldHandler {
-       return m.fieldHandler
-}
-
-func (m *KnxNetIpConnection) GetPlcValueHandler() spi.PlcValueHandler {
-       return m.valueHandler
-}
-
-func (m *KnxNetIpConnection) castIpToKnxAddress(ip net.IP) 
*driverModel.IPAddress {
-       return driverModel.NewIPAddress(utils.ByteToInt8(ip)[len(ip)-4:])
-}
-
-func (m *KnxNetIpConnection) 
handleIncomingTunnelingRequest(tunnelingRequestChan chan interface{}) {
-       for {
-               msg := <-tunnelingRequestChan
-               tunnelingRequest := driverModel.CastTunnelingRequest(msg)
-               // Send a response for this message
-               tunnelingResponse := 
driverModel.NewTunnelingResponse(driverModel.NewTunnelingResponseDataBlock(
-                       m.CommunicationChannelId, 
tunnelingRequest.TunnelingRequestDataBlock.SequenceCounter,
-                       driverModel.Status_NO_ERROR))
-               err := m.messageCodec.Send(tunnelingResponse)
-               if err != nil {
-                       // TODO: Somehow react on this ...
-                       break
-               }
-
-               go func() {
-                       cemiDataInd := 
driverModel.CastCEMIDataInd(tunnelingRequest.Cemi.Child)
-                       if cemiDataInd != nil {
-                               addressData := 
uint16(cemiDataInd.CemiDataFrame.DestinationAddress[0])<<8 | 
(uint16(cemiDataInd.CemiDataFrame.DestinationAddress[1]) & 0xFF)
-                               m.valueCacheMutex.RLock()
-                               val, ok := m.valueCache[addressData]
-                               m.valueCacheMutex.RUnlock()
-                               changed := false
-                               var payload []int8
-                               payload = append(payload, 
cemiDataInd.CemiDataFrame.DataFirstByte)
-                               payload = append(payload, 
cemiDataInd.CemiDataFrame.Data...)
-                               if !ok || !m.sliceEqual(val, payload) {
-                                       m.valueCacheMutex.Lock()
-                                       m.valueCache[addressData] = payload
-                                       m.valueCacheMutex.Unlock()
-                                       // If this is a new value, we have to 
also provide the 3 different types of addresses.
-                                       if !ok {
-                                               destinationAddress := 
cemiDataInd.CemiDataFrame.DestinationAddress
-                                               arb := 
utils.NewReadBuffer(utils.Int8ToUint8(destinationAddress))
-                                               if address, err2 := 
driverModel.KnxGroupAddressParse(arb, 3); err2 == nil {
-                                                       
m.leve3AddressCache[addressData] = 
driverModel.CastKnxGroupAddress3Level(address)
-                                               }
-                                               arb.Reset()
-                                               if address, err2 := 
driverModel.KnxGroupAddressParse(arb, 2); err2 == nil {
-                                                       
m.leve2AddressCache[addressData] = 
driverModel.CastKnxGroupAddress2Level(address)
-                                               }
-                                               arb.Reset()
-                                               if address, err2 := 
driverModel.KnxGroupAddressParse(arb, 1); err2 == nil {
-                                                       
m.leve1AddressCache[addressData] = 
driverModel.CastKnxGroupAddressFreeLevel(address)
-                                               }
-                                       }
-                                       changed = true
-                               }
-                               for _, subscriber := range m.subscribers {
-                                       
subscriber.handleValueChange(cemiDataInd.CemiDataFrame, changed)
-                               }
-                       }
-               }()
-       }
-}
-
-func (m *KnxNetIpConnection) getGroupAddressNumLevels() uint8 {
-       if val, ok := m.options["group-address-num-levels"]; ok {
-               groupAddressNumLevels, err := strconv.Atoi(val[0])
-               if err == nil {
-                       return uint8(groupAddressNumLevels)
-               }
-       }
-       return 3
-}
-
-func (m *KnxNetIpConnection) addSubscriber(subscriber *KnxNetIpSubscriber) {
-       for _, sub := range m.subscribers {
-               if sub == subscriber {
-                       return
-               }
-       }
-       m.subscribers = append(m.subscribers, subscriber)
-}
-
-func (m *KnxNetIpConnection) removeSubscriber(subscriber *KnxNetIpSubscriber) {
-       for i, sub := range m.subscribers {
-               if sub == subscriber {
-                       m.subscribers = append(m.subscribers[:i], 
m.subscribers[i+1:]...)
-               }
-       }
-}
-
-func (m *KnxNetIpConnection) sliceEqual(a, b []int8) bool {
-       if len(a) != len(b) {
-               return false
-       }
-       for i, v := range a {
-               if v != b[i] {
-                       return false
-               }
-       }
-       return true
-}
diff --git a/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go 
b/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
new file mode 100644
index 0000000..d477016
--- /dev/null
+++ b/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
@@ -0,0 +1,488 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package knxnetip
+
+import (
+    "bytes"
+    "errors"
+    driverModel 
"github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
+    internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/model"
+    "github.com/apache/plc4x/plc4go/internal/plc4go/spi"
+    "github.com/apache/plc4x/plc4go/internal/plc4go/spi/interceptors"
+    "github.com/apache/plc4x/plc4go/internal/plc4go/transports"
+    "github.com/apache/plc4x/plc4go/internal/plc4go/transports/udp"
+    "github.com/apache/plc4x/plc4go/internal/plc4go/utils"
+    "github.com/apache/plc4x/plc4go/pkg/plc4go"
+    apiModel "github.com/apache/plc4x/plc4go/pkg/plc4go/model"
+    "net"
+    "reflect"
+    "strconv"
+    "sync"
+    "time"
+)
+
+type ConnectionMetadata struct {
+    KnxMedium         string
+    GatewayName       string
+    GatewayKnxAddress string
+    ClientKnxAddress  string
+
+    ProjectNumber          uint8
+    InstallationNumber     uint8
+    DeviceSerialNumber     []int8
+    DeviceMulticastAddress []int8
+    DeviceMacAddress       []int8
+    SupportedServices      []string
+
+    apiModel.PlcConnectionMetadata
+}
+
+func (m ConnectionMetadata) GetConnectionAttributes() map[string]string {
+    return map[string]string{
+        "GatewayName":       m.GatewayName,
+        "GatewayKnxAddress": m.GatewayKnxAddress,
+        "ClientKnxAddress":  m.ClientKnxAddress,
+    }
+}
+
+func (m ConnectionMetadata) CanRead() bool {
+    return true
+}
+
+func (m ConnectionMetadata) CanWrite() bool {
+    return true
+}
+
+func (m ConnectionMetadata) CanSubscribe() bool {
+    return true
+}
+
+type KnxNetIpConnection struct {
+    messageCodec             spi.MessageCodec
+    options                  map[string][]string
+    fieldHandler             spi.PlcFieldHandler
+    valueHandler             spi.PlcValueHandler
+    quitConnectionStateTimer chan struct{}
+    subscribers              []*KnxNetIpSubscriber
+    leve3AddressCache        map[uint16]*driverModel.KnxGroupAddress3Level
+    leve2AddressCache        map[uint16]*driverModel.KnxGroupAddress2Level
+    leve1AddressCache        map[uint16]*driverModel.KnxGroupAddressFreeLevel
+
+    valueCache      map[uint16][]int8
+    valueCacheMutex sync.RWMutex
+    metadata        *ConnectionMetadata
+
+    GatewayKnxAddress      *driverModel.KnxAddress
+    ClientKnxAddress       *driverModel.KnxAddress
+    CommunicationChannelId uint8
+
+    requestInterceptor internalModel.RequestInterceptor
+    plc4go.PlcConnection
+}
+
+func NewKnxNetIpConnection(messageCodec spi.MessageCodec, options 
map[string][]string, fieldHandler spi.PlcFieldHandler) *KnxNetIpConnection {
+    return &KnxNetIpConnection{
+        messageCodec:       messageCodec,
+        options:            options,
+        fieldHandler:       fieldHandler,
+        valueHandler:       NewValueHandler(),
+        requestInterceptor: interceptors.NewSingleItemRequestInterceptor(),
+        subscribers:        []*KnxNetIpSubscriber{},
+        leve3AddressCache:  map[uint16]*driverModel.KnxGroupAddress3Level{},
+        leve2AddressCache:  map[uint16]*driverModel.KnxGroupAddress2Level{},
+        leve1AddressCache:  map[uint16]*driverModel.KnxGroupAddressFreeLevel{},
+        valueCache:         map[uint16][]int8{},
+        valueCacheMutex:    sync.RWMutex{},
+        metadata:           &ConnectionMetadata{},
+    }
+}
+
+func (m *KnxNetIpConnection) Connect() <-chan 
plc4go.PlcConnectionConnectResult {
+    ch := make(chan plc4go.PlcConnectionConnectResult)
+    go func() {
+        err := m.messageCodec.Connect()
+        if err != nil {
+            ch <- plc4go.NewPlcConnectionConnectResult(m, err)
+            return
+        }
+
+        transportInstanceExposer, ok := 
m.messageCodec.(spi.TransportInstanceExposer)
+        if !ok {
+            ch <- plc4go.NewPlcConnectionConnectResult(m, errors.New(
+                "used transport, is not a TransportInstanceExposer"))
+            return
+        }
+
+        // Prepare a SearchReq
+        udpTransportInstance, ok := 
transportInstanceExposer.GetTransportInstance().(*udp.UdpTransportInstance)
+        if !ok {
+            ch <- plc4go.NewPlcConnectionConnectResult(m, errors.New(
+                "used transport, is not a UdpTransportInstance"))
+            return
+        }
+        localAddress := 
driverModel.NewIPAddress(utils.ByteToInt8(udpTransportInstance.LocalAddress.IP))
+        discoveryEndpoint := driverModel.NewHPAIDiscoveryEndpoint(
+            driverModel.HostProtocolCode_IPV4_UDP, localAddress, 
uint16(udpTransportInstance.LocalAddress.Port))
+        searchRequest := driverModel.NewSearchRequest(discoveryEndpoint)
+        // Send the SearchReq
+        err = m.messageCodec.Send(searchRequest)
+        if err != nil {
+            ch <- plc4go.NewPlcConnectionConnectResult(m, errors.New(
+                "error sending search request"))
+            return
+        }
+        // Register an expected response
+        check := func(response interface{}) (bool, bool) {
+            searchResponse := driverModel.CastSearchResponse(response)
+            return searchResponse != nil, false
+        }
+
+        // Create a channel for async execution of the connection
+        connectionResult := make(chan error)
+
+        // Register a callback to handle the response
+        searchResponseChan := m.messageCodec.Expect(check)
+        go func() {
+            select {
+            case response := <-searchResponseChan:
+                searchResponse := driverModel.CastSearchResponse(response)
+                // Check if this device supports tunneling services
+                supportsTunneling := false
+                for _, serviceId := range 
searchResponse.DibSuppSvcFamilies.ServiceIds {
+                    _, ok := serviceId.Child.(*driverModel.KnxNetIpTunneling)
+                    if ok {
+                        supportsTunneling = true
+                        break
+                    }
+                }
+                if supportsTunneling {
+                    // Save some important information
+                    m.metadata.GatewayName = 
string(bytes.Trim(utils.Int8ToByte(
+                        searchResponse.DibDeviceInfo.DeviceFriendlyName), 
"\x00"))
+                    m.metadata.ProjectNumber = 
searchResponse.DibDeviceInfo.ProjectInstallationIdentifier.ProjectNumber
+                    m.metadata.InstallationNumber = 
searchResponse.DibDeviceInfo.ProjectInstallationIdentifier.InstallationNumber
+                    m.metadata.DeviceSerialNumber = 
searchResponse.DibDeviceInfo.KnxNetIpDeviceSerialNumber
+                    m.metadata.DeviceMulticastAddress = 
searchResponse.DibDeviceInfo.KnxNetIpDeviceMulticastAddress.Addr
+                    m.metadata.DeviceMacAddress = 
searchResponse.DibDeviceInfo.KnxNetIpDeviceMacAddress.Addr
+                    m.metadata.SupportedServices = []string{}
+                    for _, serviceId := range 
searchResponse.DibSuppSvcFamilies.ServiceIds {
+                        m.metadata.SupportedServices = 
append(m.metadata.SupportedServices, reflect.TypeOf(serviceId).Name())
+                    }
+                    m.GatewayKnxAddress = 
searchResponse.DibDeviceInfo.KnxAddress
+
+                    // As soon as we got a successful search-response back, 
send a connection request.
+                    localAddress := 
m.castIpToKnxAddress(udpTransportInstance.LocalAddress.IP)
+                    connectionRequest := driverModel.NewConnectionRequest(
+                        
driverModel.NewHPAIDiscoveryEndpoint(driverModel.HostProtocolCode_IPV4_UDP,
+                            localAddress, 
uint16(udpTransportInstance.LocalAddress.Port)),
+                        
driverModel.NewHPAIDataEndpoint(driverModel.HostProtocolCode_IPV4_UDP,
+                            localAddress, 
uint16(udpTransportInstance.LocalAddress.Port)),
+                        
driverModel.NewConnectionRequestInformationTunnelConnection(driverModel.KnxLayer_TUNNEL_LINK_LAYER),
+                    )
+
+                    // Send the connection request
+                    err = m.messageCodec.Send(connectionRequest)
+                    if err != nil {
+                        // TODO: Different channel ...
+                        ch <- plc4go.NewPlcConnectionConnectResult(m, 
errors.New(
+                            "error sending connection request"))
+                        return
+                    }
+                    // Register an expected response
+                    check := func(response interface{}) (bool, bool) {
+                        connectionResponse := 
driverModel.CastConnectionResponse(response)
+                        return connectionResponse != nil, false
+                    }
+
+                    // Register a callback to handle the response
+                    connectionResponseChan := m.messageCodec.Expect(check)
+                    go func() {
+                        response := <-connectionResponseChan
+                        connectionResponse := 
driverModel.CastConnectionResponse(response)
+                        // Save the communication channel id
+                        m.CommunicationChannelId = 
connectionResponse.CommunicationChannelId
+                        if connectionResponse.Status == 
driverModel.Status_NO_ERROR {
+                            // Register a listener for incoming tunneling 
requests
+                            checkTunnelReq := func(response interface{}) 
(bool, bool) {
+                                tunnelingRequest := 
driverModel.CastTunnelingRequest(response)
+                                return (tunnelingRequest != nil) &&
+                                        
(tunnelingRequest.TunnelingRequestDataBlock.CommunicationChannelId == 
m.CommunicationChannelId),
+                                    true
+                            }
+                            tunnelingRequestChan := 
m.messageCodec.Expect(checkTunnelReq)
+                            go func() {
+                                
m.handleIncomingTunnelingRequest(tunnelingRequestChan)
+                            }()
+
+                            tunnelConnectionDataBlock := 
connectionResponse.ConnectionResponseDataBlock.Child.(*driverModel.ConnectionResponseDataBlockTunnelConnection)
+                            // Save the KNX Address the Gateway assigned to 
this connection.
+                            m.ClientKnxAddress = 
tunnelConnectionDataBlock.KnxAddress
+
+                            // Fire the "connected" event
+                            ch <- plc4go.NewPlcConnectionConnectResult(m, nil)
+
+                            // Start a timer that sends connection-state 
requests every 60 seconds
+                            connectionStateTimer := time.NewTicker(60 * 
time.Second)
+                            m.quitConnectionStateTimer = make(chan struct{})
+                            go func() {
+                                for {
+                                    select {
+                                    case <-connectionStateTimer.C:
+                                        // We're using the 
connection-state-request as ping operation ...
+                                        ping := m.Ping()
+                                        pingResult := <-ping
+                                        if pingResult.Err != nil {
+                                            // TODO: Do some error handling 
here ...
+                                            connectionStateTimer.Stop()
+                                        }
+                                    case <-m.quitConnectionStateTimer:
+                                        // TODO: Do some error handling here 
...
+                                        connectionStateTimer.Stop()
+                                        return
+                                    }
+                                }
+                            }()
+                        } else {
+                            ch <- plc4go.NewPlcConnectionConnectResult(m,
+                                errors.New("got a connection response with 
status "+strconv.Itoa(int(connectionResponse.Status))))
+                        }
+                    }()
+                } else {
+                    ch <- plc4go.NewPlcConnectionConnectResult(m,
+                        errors.New("this connection doesn't support 
tunneling"))
+                }
+            case <-time.After(1 * time.Second):
+                ch <- plc4go.NewPlcConnectionConnectResult(m, 
errors.New("request timed out"))
+            }
+        }()
+        // Wait for the connection to be established
+        err = <-connectionResult
+        ch <- plc4go.NewPlcConnectionConnectResult(m, err)
+    }()
+    return ch
+}
+
+func (m *KnxNetIpConnection) Close() <-chan plc4go.PlcConnectionCloseResult {
+    // TODO: Implement ...
+    ch := make(chan plc4go.PlcConnectionCloseResult)
+    go func() {
+        ch <- plc4go.NewPlcConnectionCloseResult(m, nil)
+    }()
+    return ch
+}
+
+func (m *KnxNetIpConnection) IsConnected() bool {
+    panic("implement me")
+}
+
+func (m *KnxNetIpConnection) Ping() <-chan plc4go.PlcConnectionPingResult {
+    result := make(chan plc4go.PlcConnectionPingResult)
+    // diagnosticRequestPdu := driverModel.NewModbusPDUDiagnosticRequest(0, 
0x42)
+    go func() {
+        transportInstanceExposer, ok := 
m.messageCodec.(spi.TransportInstanceExposer)
+        if !ok {
+            result <- plc4go.NewPlcConnectionPingResult(errors.New(
+                "used transport, is not a TransportInstanceExposer"))
+            return
+        }
+
+        // Prepare a SearchReq
+        udpTransportInstance, ok := 
transportInstanceExposer.GetTransportInstance().(*udp.UdpTransportInstance)
+        if !ok {
+            result <- plc4go.NewPlcConnectionPingResult(errors.New(
+                "used transport, is not a UdpTransportInstance"))
+            return
+        }
+
+        localAddress := 
m.castIpToKnxAddress(udpTransportInstance.LocalAddress.IP)
+
+        connectionStateRequest := driverModel.NewConnectionStateRequest(
+            m.CommunicationChannelId,
+            driverModel.NewHPAIControlEndpoint(
+                driverModel.HostProtocolCode_IPV4_UDP,
+                localAddress, uint16(udpTransportInstance.LocalAddress.Port)))
+
+        // Send the connection state request
+        err := m.messageCodec.Send(connectionStateRequest)
+        if err != nil {
+            result <- plc4go.NewPlcConnectionPingResult(err)
+            return
+        }
+        // Register an expected response
+        check := func(response interface{}) (bool, bool) {
+            connectionStateResponse := 
driverModel.CastConnectionStateResponse(response)
+            return connectionStateResponse != nil, false
+        }
+
+        // Register a callback to handle the response
+        connectionStateResponseChan := m.messageCodec.Expect(check)
+        go func() {
+            response := <-connectionStateResponseChan
+            connectionStateResponse := 
driverModel.CastConnectionStateResponse(response)
+            if connectionStateResponse.Status != driverModel.Status_NO_ERROR {
+                result <- plc4go.NewPlcConnectionPingResult(errors.New(
+                    "got a failure response code " + 
strconv.Itoa(int(connectionStateResponse.Status))))
+            } else {
+                result <- plc4go.NewPlcConnectionPingResult(nil)
+            }
+        }()
+    }()
+    return result
+}
+
+func (m *KnxNetIpConnection) GetMetadata() apiModel.PlcConnectionMetadata {
+    return m.metadata
+}
+
+func (m *KnxNetIpConnection) ReadRequestBuilder() 
apiModel.PlcReadRequestBuilder {
+    return internalModel.NewDefaultPlcReadRequestBuilder(
+        m.fieldHandler, NewKnxNetIpReader(m))
+}
+
+func (m *KnxNetIpConnection) WriteRequestBuilder() 
apiModel.PlcWriteRequestBuilder {
+    return internalModel.NewDefaultPlcWriteRequestBuilder(
+        m.fieldHandler, m.valueHandler, NewKnxNetIpWriter(m.messageCodec))
+}
+
+func (m *KnxNetIpConnection) SubscriptionRequestBuilder() 
apiModel.PlcSubscriptionRequestBuilder {
+    return internalModel.NewDefaultPlcSubscriptionRequestBuilder(
+        m.fieldHandler, m.valueHandler, NewKnxNetIpSubscriber(m))
+}
+
+func (m *KnxNetIpConnection) UnsubscriptionRequestBuilder() 
apiModel.PlcUnsubscriptionRequestBuilder {
+    return nil /*internalModel.NewDefaultPlcUnsubscriptionRequestBuilder(
+      m.fieldHandler, m.valueHandler, NewKnxNetIpSubscriber(m.messageCodec))*/
+}
+
+func (m *KnxNetIpConnection) GetTransportInstance() 
transports.TransportInstance {
+    if mc, ok := m.messageCodec.(spi.TransportInstanceExposer); ok {
+        return mc.GetTransportInstance()
+    }
+    return nil
+}
+
+func (m *KnxNetIpConnection) GetPlcFieldHandler() spi.PlcFieldHandler {
+    return m.fieldHandler
+}
+
+func (m *KnxNetIpConnection) GetPlcValueHandler() spi.PlcValueHandler {
+    return m.valueHandler
+}
+
+func (m *KnxNetIpConnection) castIpToKnxAddress(ip net.IP) 
*driverModel.IPAddress {
+    return driverModel.NewIPAddress(utils.ByteToInt8(ip)[len(ip)-4:])
+}
+
+func (m *KnxNetIpConnection) 
handleIncomingTunnelingRequest(tunnelingRequestChan chan interface{}) {
+    for {
+        msg := <-tunnelingRequestChan
+        tunnelingRequest := driverModel.CastTunnelingRequest(msg)
+        // Send a response for this message
+        tunnelingResponse := 
driverModel.NewTunnelingResponse(driverModel.NewTunnelingResponseDataBlock(
+            m.CommunicationChannelId, 
tunnelingRequest.TunnelingRequestDataBlock.SequenceCounter,
+            driverModel.Status_NO_ERROR))
+        err := m.messageCodec.Send(tunnelingResponse)
+        if err != nil {
+            // TODO: Somehow react on this ...
+            break
+        }
+
+        go func() {
+            cemiDataInd := 
driverModel.CastCEMIDataInd(tunnelingRequest.Cemi.Child)
+            if cemiDataInd != nil {
+                addressData := 
uint16(cemiDataInd.CemiDataFrame.DestinationAddress[0])<<8 | 
(uint16(cemiDataInd.CemiDataFrame.DestinationAddress[1]) & 0xFF)
+                m.valueCacheMutex.RLock()
+                val, ok := m.valueCache[addressData]
+                m.valueCacheMutex.RUnlock()
+                changed := false
+                var payload []int8
+                payload = append(payload, 
cemiDataInd.CemiDataFrame.DataFirstByte)
+                payload = append(payload, cemiDataInd.CemiDataFrame.Data...)
+                if !ok || !m.sliceEqual(val, payload) {
+                    m.valueCacheMutex.Lock()
+                    m.valueCache[addressData] = payload
+                    m.valueCacheMutex.Unlock()
+                    // If this is a new value, we have to also provide the 3 
different types of addresses.
+                    if !ok {
+                        destinationAddress := 
cemiDataInd.CemiDataFrame.DestinationAddress
+                        arb := 
utils.NewReadBuffer(utils.Int8ToUint8(destinationAddress))
+                        if address, err2 := 
driverModel.KnxGroupAddressParse(arb, 3); err2 == nil {
+                            m.leve3AddressCache[addressData] = 
driverModel.CastKnxGroupAddress3Level(address)
+                        }
+                        arb.Reset()
+                        if address, err2 := 
driverModel.KnxGroupAddressParse(arb, 2); err2 == nil {
+                            m.leve2AddressCache[addressData] = 
driverModel.CastKnxGroupAddress2Level(address)
+                        }
+                        arb.Reset()
+                        if address, err2 := 
driverModel.KnxGroupAddressParse(arb, 1); err2 == nil {
+                            m.leve1AddressCache[addressData] = 
driverModel.CastKnxGroupAddressFreeLevel(address)
+                        }
+                    }
+                    changed = true
+                }
+                for _, subscriber := range m.subscribers {
+                    subscriber.handleValueChange(cemiDataInd.CemiDataFrame, 
changed)
+                }
+            }
+        }()
+    }
+}
+
+func (m *KnxNetIpConnection) getGroupAddressNumLevels() uint8 {
+    if val, ok := m.options["group-address-num-levels"]; ok {
+        groupAddressNumLevels, err := strconv.Atoi(val[0])
+        if err == nil {
+            return uint8(groupAddressNumLevels)
+        }
+    }
+    return 3
+}
+
+func (m *KnxNetIpConnection) addSubscriber(subscriber *KnxNetIpSubscriber) {
+    for _, sub := range m.subscribers {
+        if sub == subscriber {
+            return
+        }
+    }
+    m.subscribers = append(m.subscribers, subscriber)
+}
+
+func (m *KnxNetIpConnection) removeSubscriber(subscriber *KnxNetIpSubscriber) {
+    for i, sub := range m.subscribers {
+        if sub == subscriber {
+            m.subscribers = append(m.subscribers[:i], m.subscribers[i+1:]...)
+        }
+    }
+}
+
+func (m *KnxNetIpConnection) sliceEqual(a, b []int8) bool {
+    if len(a) != len(b) {
+        return false
+    }
+    for i, v := range a {
+        if v != b[i] {
+            return false
+        }
+    }
+    return true
+}
+
+func (m *KnxNetIpConnection) knxAddressToString(knxAddress 
*driverModel.KnxAddress) string {
+    return strconv.Itoa(int(knxAddress.MainGroup)) + "." + 
strconv.Itoa(int(knxAddress.MiddleGroup)) + "." + 
strconv.Itoa(int(knxAddress.SubGroup))
+}
diff --git a/plc4go/internal/plc4go/modbus/ModbusConnection.go 
b/plc4go/internal/plc4go/modbus/ModbusConnection.go
index 4fa39ef..cd776d3 100644
--- a/plc4go/internal/plc4go/modbus/ModbusConnection.go
+++ b/plc4go/internal/plc4go/modbus/ModbusConnection.go
@@ -33,6 +33,11 @@ type ConnectionMetadata struct {
     apiModel.PlcConnectionMetadata
 }
 
+func (m ConnectionMetadata) GetConnectionAttributes() map[string]string {
+    return map[string]string {
+    }
+}
+
 func (m ConnectionMetadata) CanRead() bool {
        return true
 }
diff --git a/plc4go/pkg/plc4go/model/plc_connection_metadata.go 
b/plc4go/pkg/plc4go/model/plc_connection_metadata.go
index c7e6f6b..382f47d 100644
--- a/plc4go/pkg/plc4go/model/plc_connection_metadata.go
+++ b/plc4go/pkg/plc4go/model/plc_connection_metadata.go
@@ -21,10 +21,15 @@ package model
 // Information about connection capabilities.
 // This includes connection and driver specific metadata.
 type PlcConnectionMetadata interface {
+
+    // Gives access to a map of additional information the driver might be 
able to provide.
+    GetConnectionAttributes() map[string]string
+
        // Indicates that the connection supports reading.
        CanRead() bool
        // Indicates that the connection supports writing.
        CanWrite() bool
        // Indicates that the connection supports subscription.
        CanSubscribe() bool
+
 }

Reply via email to