This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 83606befb0 feat(plc4go/bacnet): use upstream device info cache
83606befb0 is described below

commit 83606befb06f9012aa68339dd328d068a83b9a54
Author: Sebastian Rühl <sru...@apache.org>
AuthorDate: Mon Nov 14 14:16:31 2022 +0100

    feat(plc4go/bacnet): use upstream device info cache
---
 plc4go/internal/bacnetip/ApplicationLayer.go  |  71 ++++++-----
 plc4go/internal/bacnetip/ApplicationModule.go | 176 +++++++++++++++++++++++++-
 plc4go/internal/bacnetip/Device.go            |  22 ++++
 plc4go/internal/bacnetip/DeviceInventory.go   |   1 +
 4 files changed, 235 insertions(+), 35 deletions(-)

diff --git a/plc4go/internal/bacnetip/ApplicationLayer.go 
b/plc4go/internal/bacnetip/ApplicationLayer.go
index c2a1324adc..04dc50f415 100644
--- a/plc4go/internal/bacnetip/ApplicationLayer.go
+++ b/plc4go/internal/bacnetip/ApplicationLayer.go
@@ -75,8 +75,8 @@ type segmentAPDU struct {
 type SSMSAPRequirements interface {
        _ServiceAccessPoint
        _Client
-       GetDeviceInventory() *DeviceInventory
-       GetLocalDevice() DeviceEntry
+       GetDeviceInfoCache() *DeviceInfoCache
+       GetLocalDevice() LocalDeviceObject
        GetProposedWindowSize() uint8
        GetClientTransactions() []*ClientSSM
        GetServerTransactions() []*ServerSSM
@@ -89,8 +89,8 @@ type SSM struct {
 
        ssmSAP SSMSAPRequirements
 
-       pduAddress  []byte
-       deviceEntry *DeviceEntry
+       pduAddress []byte
+       deviceInfo *DeviceInfo
 
        invokeId uint8
 
@@ -116,15 +116,16 @@ type SSM struct {
 
 func NewSSM(sap SSMSAPRequirements, pduAddress []byte) (SSM, error) {
        log.Debug().Interface("sap", sap).Bytes("pdu_address", 
pduAddress).Msg("init")
-       deviceEntry, err := 
sap.GetDeviceInventory().getEntryForDestination(pduAddress)
-       if err != nil {
-               return SSM{}, errors.Wrap(err, "Can't create SSM")
+       var deviceInfo *DeviceInfo
+       deviceInfoTemp, ok := 
sap.GetDeviceInfoCache().GetDeviceInfo(DeviceInfoCacheKey{PduSource: 
pduAddress})
+       if ok {
+               deviceInfo = &deviceInfoTemp
        }
        localDevice := sap.GetLocalDevice()
        return SSM{
                ssmSAP:                sap,
                pduAddress:            pduAddress,
-               deviceEntry:           deviceEntry,
+               deviceInfo:            deviceInfo,
                state:                 IDLE,
                numberOfApduRetries:   localDevice.NumberOfAPDURetries,
                apduTimeout:           localDevice.APDUTimeout,
@@ -342,7 +343,7 @@ func NewClientSSM(sap SSMSAPRequirements, pduAddress 
[]byte) (*ClientSSM, error)
                return nil, err
        }
        // TODO: if deviceEntry is not there get it now...
-       if ssm.deviceEntry == nil {
+       if ssm.deviceInfo == nil {
                // TODO: get entry for device, store it in inventory
                log.Debug().Msg("Accquire device information")
        }
@@ -362,7 +363,7 @@ func (s *ClientSSM) setState(newState SSMState, timer 
*uint) error {
        if s.state == COMPLETED || s.state == ABORTED {
                log.Debug().Msg("remove from active transaction")
                s.ssmSAP.GetClientTransactions() // TODO remove "this" 
transaction from the list
-               if s.deviceEntry == nil {
+               if s.deviceInfo == nil {
                        // TODO: release device entry
                        log.Debug().Msg("release device entry")
                }
@@ -395,13 +396,13 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) 
error { // TODO: maybe
 
        // if the max apdu length of the server isn't known, assume that it is 
the same size as our own and will be the segment
        //        size
-       if s.deviceEntry == nil || s.deviceEntry.MaximumApduLengthAccepted != 
nil {
+       if s.deviceInfo == nil || s.deviceInfo.MaximumApduLengthAccepted != nil 
{
                s.segmentSize = uint(s.maxApduLengthAccepted.NumberOfOctets())
-       } else if s.deviceEntry.MaximumNpduLength == nil {
+       } else if s.deviceInfo.MaximumNpduLength == nil {
                //      if the max npdu length of the server isn't known, 
assume that it is the same as the max apdu length accepted
                s.segmentSize = uint(s.maxApduLengthAccepted.NumberOfOctets())
        } else {
-               s.segmentSize = utils.Min(*s.deviceEntry.MaximumNpduLength, 
uint(s.maxApduLengthAccepted.NumberOfOctets()))
+               s.segmentSize = utils.Min(*s.deviceInfo.MaximumNpduLength, 
uint(s.maxApduLengthAccepted.NumberOfOctets()))
        }
        log.Debug().Msgf("segment size %d", s.segmentSize)
 
@@ -426,9 +427,9 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) 
error { // TODO: maybe
                        return s.Response(abort)
                }
 
-               if s.deviceEntry == nil {
+               if s.deviceInfo == nil {
                        log.Debug().Msg("no server info for segmentation 
support")
-               } else if s.deviceEntry.SegmentationSupported != 
readWriteModel.BACnetSegmentation_SEGMENTED_TRANSMIT && 
s.deviceEntry.SegmentationSupported != 
readWriteModel.BACnetSegmentation_SEGMENTED_BOTH {
+               } else if *s.deviceInfo.SegmentationSupported != 
readWriteModel.BACnetSegmentation_SEGMENTED_TRANSMIT && 
*s.deviceInfo.SegmentationSupported != 
readWriteModel.BACnetSegmentation_SEGMENTED_BOTH {
                        log.Debug().Msg("server can't receive segmented 
requests")
                        abort, err := 
s.abort(readWriteModel.BACnetAbortReason_SEGMENTATION_NOT_SUPPORTED)
                        if err != nil {
@@ -438,11 +439,11 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) 
error { // TODO: maybe
                }
 
                // make sure we don't exceed the number of segments in our 
request that the server said it was willing to accept
-               if s.deviceEntry == nil {
+               if s.deviceInfo == nil {
                        log.Debug().Msg("no server info for maximum number of 
segments")
-               } else if s.deviceEntry.MaxSegmentsAccepted == nil {
+               } else if s.deviceInfo.MaxSegmentsAccepted == nil {
                        log.Debug().Msgf("server doesn't say maximum number of 
segments")
-               } else if s.segmentCount > 
s.deviceEntry.MaxSegmentsAccepted.MaxSegments() {
+               } else if s.segmentCount > 
s.deviceInfo.MaxSegmentsAccepted.MaxSegments() {
                        log.Debug().Msg("server can't receive enough segments")
                        abort, err := 
s.abort(readWriteModel.BACnetAbortReason_APDU_TOO_LONG)
                        if err != nil {
@@ -909,7 +910,7 @@ func NewServerSSM(sap SSMSAPRequirements, pduAddress 
[]byte) (*ServerSSM, error)
                return nil, err
        }
        // TODO: if deviceEntry is not there get it now...
-       if &ssm.deviceEntry == nil {
+       if &ssm.deviceInfo == nil {
                // TODO: get entry for device, store it in inventory
                log.Debug().Msg("Accquire device information")
        }
@@ -930,7 +931,7 @@ func (s *ServerSSM) setState(newState SSMState, timer 
*uint) error {
        if s.state == COMPLETED || s.state == ABORTED {
                log.Debug().Msg("remove from active transaction")
                s.ssmSAP.GetServerTransactions() // TODO remove "this" 
transaction from the list
-               if s.deviceEntry != nil {
+               if s.deviceInfo != nil {
                        // TODO: release device entry
                        log.Debug().Msg("release device entry")
                }
@@ -1020,10 +1021,10 @@ func (s *ServerSSM) Confirmation(apdu 
readWriteModel.APDU) error {
 
                // the segment size is the minimum of the size of the largest 
packet that can be delivered to the client and the
                //            largest it can accept
-               if s.deviceEntry == nil || s.deviceEntry.MaximumNpduLength == 
nil {
+               if s.deviceInfo == nil || s.deviceInfo.MaximumNpduLength == nil 
{
                        s.segmentSize = 
uint(s.maxApduLengthAccepted.NumberOfOctets())
                } else {
-                       s.segmentSize = 
utils.Min(*s.deviceEntry.MaximumNpduLength, 
uint(s.maxApduLengthAccepted.NumberOfOctets()))
+                       s.segmentSize = 
utils.Min(*s.deviceInfo.MaximumNpduLength, 
uint(s.maxApduLengthAccepted.NumberOfOctets()))
                }
 
                // compute the segment count
@@ -1158,16 +1159,18 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) 
error {
        s.segmentedResponseAccepted = 
apduConfirmedRequest.GetSegmentedResponseAccepted()
 
        // if there is a cache record, check to see if it needs to be updated
-       if apduConfirmedRequest.GetSegmentedResponseAccepted() && s.deviceEntry 
!= nil {
-               switch s.deviceEntry.SegmentationSupported {
+       if apduConfirmedRequest.GetSegmentedResponseAccepted() && s.deviceInfo 
!= nil {
+               switch *s.deviceInfo.SegmentationSupported {
                case readWriteModel.BACnetSegmentation_NO_SEGMENTATION:
                        log.Debug().Msg("client actually supports segmented 
receive")
-                       s.deviceEntry.SegmentationSupported = 
readWriteModel.BACnetSegmentation_SEGMENTED_RECEIVE
+                       segmentedReceive := 
readWriteModel.BACnetSegmentation_SEGMENTED_RECEIVE
+                       s.deviceInfo.SegmentationSupported = &segmentedReceive
 
                // TODO: bacpypes updates the cache here but as we have a 
pointer  to the entry we should need that. Maybe we should because 
concurrency... lets see later
                case readWriteModel.BACnetSegmentation_SEGMENTED_TRANSMIT:
                        log.Debug().Msg("client actually supports both 
segmented transmit and receive")
-                       s.deviceEntry.SegmentationSupported = 
readWriteModel.BACnetSegmentation_SEGMENTED_BOTH
+                       segmentedBoth := 
readWriteModel.BACnetSegmentation_SEGMENTED_BOTH
+                       s.deviceInfo.SegmentationSupported = &segmentedBoth
 
                        // TODO: bacpypes updates the cache here but as we have 
a pointer  to the entry we should need that. Maybe we should because 
concurrency... lets see later
                case readWriteModel.BACnetSegmentation_SEGMENTED_RECEIVE, 
readWriteModel.BACnetSegmentation_SEGMENTED_BOTH:
@@ -1182,11 +1185,11 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) 
error {
        //        received
        getMaxApduLengthAccepted := 
apduConfirmedRequest.GetMaxApduLengthAccepted()
        s.maxApduLengthAccepted = &getMaxApduLengthAccepted
-       if s.deviceEntry != nil && s.deviceEntry.MaximumApduLengthAccepted != 
nil {
-               if *s.deviceEntry.MaximumApduLengthAccepted < 
*s.maxApduLengthAccepted {
+       if s.deviceInfo != nil && s.deviceInfo.MaximumApduLengthAccepted != nil 
{
+               if *s.deviceInfo.MaximumApduLengthAccepted < 
*s.maxApduLengthAccepted {
                        log.Debug().Msg("apdu max reponse encoding error")
                } else {
-                       s.maxApduLengthAccepted = 
s.deviceEntry.MaximumApduLengthAccepted
+                       s.maxApduLengthAccepted = 
s.deviceInfo.MaximumApduLengthAccepted
                }
        }
        log.Debug().Msgf("maxApduLengthAccepted %s", *s.maxApduLengthAccepted)
@@ -1459,8 +1462,8 @@ type StateMachineAccessPoint struct {
        *Client
        *ServiceAccessPoint
 
-       localDevice           DeviceEntry
-       deviceInventory       *DeviceInventory
+       localDevice           LocalDeviceObject
+       deviceInventory       *DeviceInfoCache
        nextInvokeId          uint8
        clientTransactions    []*ClientSSM
        serverTransactions    []*ServerSSM
@@ -1475,7 +1478,7 @@ type StateMachineAccessPoint struct {
        applicationTimeout    uint
 }
 
-func NewStateMachineAccessPoint(localDevice DeviceEntry, deviceInventory 
*DeviceInventory, sapID *int, cid *int) (*StateMachineAccessPoint, error) {
+func NewStateMachineAccessPoint(localDevice LocalDeviceObject, deviceInventory 
*DeviceInfoCache, sapID *int, cid *int) (*StateMachineAccessPoint, error) {
        log.Debug().Msgf("NewStateMachineAccessPoint localDevice=%v 
deviceInventory=%v sap=%v cid=%v", localDevice, deviceInventory, sapID, cid)
 
        s := &StateMachineAccessPoint{
@@ -1788,11 +1791,11 @@ func (s *StateMachineAccessPoint) SapConfirmation(apdu 
readWriteModel.APDU, pduD
        return nil
 }
 
-func (s *StateMachineAccessPoint) GetDeviceInventory() *DeviceInventory {
+func (s *StateMachineAccessPoint) GetDeviceInfoCache() *DeviceInfoCache {
        return s.deviceInventory
 }
 
-func (s *StateMachineAccessPoint) GetLocalDevice() DeviceEntry {
+func (s *StateMachineAccessPoint) GetLocalDevice() LocalDeviceObject {
        return s.localDevice
 }
 
diff --git a/plc4go/internal/bacnetip/ApplicationModule.go 
b/plc4go/internal/bacnetip/ApplicationModule.go
index 17aba59782..0ae673addd 100644
--- a/plc4go/internal/bacnetip/ApplicationModule.go
+++ b/plc4go/internal/bacnetip/ApplicationModule.go
@@ -20,10 +20,184 @@
 package bacnetip
 
 import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       readWriteModel 
"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
        "github.com/pkg/errors"
        "github.com/rs/zerolog/log"
+       "hash/fnv"
 )
 
+type DeviceInfo struct {
+       DeviceIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier
+       Address          []byte
+
+       MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
+       SegmentationSupported     *readWriteModel.BACnetSegmentation
+       MaxSegmentsAccepted       *readWriteModel.MaxSegmentsAccepted
+       VendorId                  *readWriteModel.BACnetVendorId
+       MaximumNpduLength         *uint
+
+       _refCount int
+       _cacheKey DeviceInfoCacheKey
+}
+
+func NewDeviceInfo(deviceIdentifier 
readWriteModel.BACnetTagPayloadObjectIdentifier, address []byte) *DeviceInfo {
+       return &DeviceInfo{
+               DeviceIdentifier: deviceIdentifier,
+               Address:          address,
+
+               MaximumApduLengthAccepted: func() 
*readWriteModel.MaxApduLengthAccepted {
+                       octets1024 := 
readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024
+                       return &octets1024
+               }(),
+               SegmentationSupported: func() 
*readWriteModel.BACnetSegmentation {
+                       noSegmentation := 
readWriteModel.BACnetSegmentation_NO_SEGMENTATION
+                       return &noSegmentation
+               }(),
+       }
+}
+
+// DeviceInfoCacheKey caches by either Instance, PduSource of both
+type DeviceInfoCacheKey struct {
+       Instance  *uint32
+       PduSource []byte
+}
+
+func (k DeviceInfoCacheKey) HashKey() uint32 {
+       h := fnv.New32a()
+       if k.Instance != nil {
+               _ = binary.Write(h, binary.BigEndian, *k.Instance)
+       }
+       _, _ = h.Write(k.PduSource)
+       return h.Sum32()
+}
+
+func (k DeviceInfoCacheKey) String() string {
+       return fmt.Sprintf("key: %d/%x", k.Instance, k.PduSource)
+}
+
+type DeviceInfoCache struct {
+       cache map[uint32]DeviceInfo
+}
+
+func NewDeviceInfoCache() *DeviceInfoCache {
+       return &DeviceInfoCache{
+               cache: make(map[uint32]DeviceInfo),
+       }
+}
+
+// HasDeviceInfo Return true if cache has information about the device.
+func (i *DeviceInfoCache) HasDeviceInfo(key DeviceInfoCacheKey) bool {
+       _, ok := i.cache[key.HashKey()]
+       return ok
+}
+
+// IAmDeviceInfo Create a device information record based on the contents of 
an IAmRequest and put it in the cache.
+func (i *DeviceInfoCache) IAmDeviceInfo(iAm 
readWriteModel.BACnetUnconfirmedServiceRequestIAm, pduSource []byte) {
+       log.Debug().Msgf("IAmDeviceInfo\n%s", iAm)
+
+       deviceIdentifier := iAm.GetDeviceIdentifier()
+       // Get the device instance
+       deviceInstance := deviceIdentifier.GetInstanceNumber()
+
+       // get the existing cache record if it exists
+       deviceInfo, ok := i.cache[DeviceInfoCacheKey{&deviceInstance, 
nil}.HashKey()]
+
+       // maybe there is a record for this address
+       if !ok {
+               deviceInfo, ok = i.cache[DeviceInfoCacheKey{nil, 
pduSource}.HashKey()]
+       }
+
+       // make a new one using the class provided
+       if !ok {
+               deviceInfo = DeviceInfo{
+                       DeviceIdentifier: deviceIdentifier.GetPayload(),
+                       Address:          pduSource,
+               }
+       }
+
+       // jam in the correct values
+       maximumApduLengthAccepted := 
readWriteModel.MaxApduLengthAccepted(iAm.GetMaximumApduLengthAcceptedLength().GetActualValue())
+       deviceInfo.MaximumApduLengthAccepted = &maximumApduLengthAccepted
+       sementationSupported := iAm.GetSegmentationSupported().GetValue()
+       deviceInfo.SegmentationSupported = &sementationSupported
+       vendorId := iAm.GetVendorId().GetValue()
+       deviceInfo.VendorId = &vendorId
+
+       // tell the cache this is an updated record
+       i.UpdateDeviceInfo(deviceInfo)
+}
+
+// GetDeviceInfo gets a DeviceInfo from cache
+func (i *DeviceInfoCache) GetDeviceInfo(key DeviceInfoCacheKey) (DeviceInfo, 
bool) {
+       log.Debug().Msgf("GetDeviceInfo %s", key)
+
+       // get the info if it's there
+       deviceInfo, ok := i.cache[key.HashKey()]
+       log.Debug().Msgf("deviceInfo: %#v", deviceInfo)
+
+       return deviceInfo, ok
+}
+
+// UpdateDeviceInfo The application has updated one or more fields in the 
device information record and the cache needs
+//        to be updated to reflect the changes.  If this is a cached version 
of a persistent record then this is the
+//        opportunity to update the database.
+func (i *DeviceInfoCache) UpdateDeviceInfo(deviceInfo DeviceInfo) {
+       log.Debug().Msgf("UpdateDeviceInfo %#v", deviceInfo)
+
+       // get the current key
+       cacheKey := deviceInfo._cacheKey
+       if cacheKey.Instance != nil && 
deviceInfo.DeviceIdentifier.GetInstanceNumber() != *cacheKey.Instance {
+               instanceNumber := 
deviceInfo.DeviceIdentifier.GetInstanceNumber()
+               cacheKey.Instance = &instanceNumber
+               delete(i.cache, cacheKey.HashKey())
+               i.cache[DeviceInfoCacheKey{Instance: 
&instanceNumber}.HashKey()] = deviceInfo
+       }
+       if bytes.Compare(deviceInfo.Address, cacheKey.PduSource) != 0 {
+               cacheKey.PduSource = deviceInfo.Address
+               delete(i.cache, cacheKey.HashKey())
+               i.cache[DeviceInfoCacheKey{PduSource: 
cacheKey.PduSource}.HashKey()] = deviceInfo
+       }
+
+       // update the key
+       instanceNumber := deviceInfo.DeviceIdentifier.GetInstanceNumber()
+       deviceInfo._cacheKey = DeviceInfoCacheKey{
+               Instance:  &instanceNumber,
+               PduSource: deviceInfo.Address,
+       }
+       i.cache[deviceInfo._cacheKey.HashKey()] = deviceInfo
+}
+
+// Acquire Return the known information about the device and mark the record 
as being used by a segmentation state
+//        machine.
+func (i *DeviceInfoCache) Acquire(key DeviceInfoCacheKey) (DeviceInfo, bool) {
+       log.Debug().Msgf("Acquire %#v", key)
+
+       deviceInfo, ok := i.cache[key.HashKey()]
+       if ok {
+               deviceInfo._refCount++
+               i.cache[key.HashKey()] = deviceInfo
+       }
+
+       return deviceInfo, ok
+}
+
+// Release This function is called by the segmentation state machine when it 
has finished with the device information.
+func (i *DeviceInfoCache) Release(deviceInfo DeviceInfo) error {
+
+       //this information record might be used by more than one SSM
+       if deviceInfo._refCount == 0 {
+               return errors.New("reference count")
+       }
+
+       // decrement the reference count
+       deviceInfo._refCount--
+       i.cache[deviceInfo._cacheKey.HashKey()] = deviceInfo
+       return nil
+}
+
 // TODO: implement
 type Application struct {
        ApplicationServiceElement
@@ -58,7 +232,7 @@ type BIPSimpleApplication struct {
        mux          *UDPMultiplexer
 }
 
-func NewBIPSimpleApplication(localDevice DeviceEntry, localAddress, 
deviceInfoCache *DeviceInventory, aseID *int) (*BIPSimpleApplication, error) {
+func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress, 
deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
        b := &BIPSimpleApplication{}
        var err error
        b.ApplicationIOController, err = 
NewApplicationIOController(localDevice, localAddress, deviceInfoCache, aseID)
diff --git a/plc4go/internal/bacnetip/Device.go 
b/plc4go/internal/bacnetip/Device.go
index cbf38a66aa..cf0e825ddd 100644
--- a/plc4go/internal/bacnetip/Device.go
+++ b/plc4go/internal/bacnetip/Device.go
@@ -19,6 +19,8 @@
 
 package bacnetip
 
+import readWriteModel 
"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+
 type WhoIsIAmServices struct {
 }
 
@@ -26,3 +28,23 @@ func NewWhoIsIAmServices() (*WhoIsIAmServices, error) {
        // TODO: implement me
        return nil, nil
 }
+
+type LocalDeviceObject struct {
+       NumberOfAPDURetries       uint
+       APDUTimeout               uint
+       SegmentationSupported     readWriteModel.BACnetSegmentation
+       APDUSegmentTimeout        uint
+       MaxSegmentsAccepted       *readWriteModel.MaxSegmentsAccepted
+       MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
+}
+
+func NewLocalDeviceObject() *LocalDeviceObject {
+       return &LocalDeviceObject{
+               NumberOfAPDURetries:       0,
+               APDUTimeout:               0,
+               SegmentationSupported:     0,
+               APDUSegmentTimeout:        0,
+               MaxSegmentsAccepted:       nil,
+               MaximumApduLengthAccepted: nil,
+       }
+}
diff --git a/plc4go/internal/bacnetip/DeviceInventory.go 
b/plc4go/internal/bacnetip/DeviceInventory.go
index 350b3c0393..86f3f5c690 100644
--- a/plc4go/internal/bacnetip/DeviceInventory.go
+++ b/plc4go/internal/bacnetip/DeviceInventory.go
@@ -26,6 +26,7 @@ import (
        "time"
 )
 
+// TODO: migrate into device info cache
 type DeviceInventory struct {
        sync.RWMutex
        devices map[string]DeviceEntry

Reply via email to