This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push: new 83606befb0 feat(plc4go/bacnet): use upstream device info cache 83606befb0 is described below commit 83606befb06f9012aa68339dd328d068a83b9a54 Author: Sebastian Rühl <sru...@apache.org> AuthorDate: Mon Nov 14 14:16:31 2022 +0100 feat(plc4go/bacnet): use upstream device info cache --- plc4go/internal/bacnetip/ApplicationLayer.go | 71 ++++++----- plc4go/internal/bacnetip/ApplicationModule.go | 176 +++++++++++++++++++++++++- plc4go/internal/bacnetip/Device.go | 22 ++++ plc4go/internal/bacnetip/DeviceInventory.go | 1 + 4 files changed, 235 insertions(+), 35 deletions(-) diff --git a/plc4go/internal/bacnetip/ApplicationLayer.go b/plc4go/internal/bacnetip/ApplicationLayer.go index c2a1324adc..04dc50f415 100644 --- a/plc4go/internal/bacnetip/ApplicationLayer.go +++ b/plc4go/internal/bacnetip/ApplicationLayer.go @@ -75,8 +75,8 @@ type segmentAPDU struct { type SSMSAPRequirements interface { _ServiceAccessPoint _Client - GetDeviceInventory() *DeviceInventory - GetLocalDevice() DeviceEntry + GetDeviceInfoCache() *DeviceInfoCache + GetLocalDevice() LocalDeviceObject GetProposedWindowSize() uint8 GetClientTransactions() []*ClientSSM GetServerTransactions() []*ServerSSM @@ -89,8 +89,8 @@ type SSM struct { ssmSAP SSMSAPRequirements - pduAddress []byte - deviceEntry *DeviceEntry + pduAddress []byte + deviceInfo *DeviceInfo invokeId uint8 @@ -116,15 +116,16 @@ type SSM struct { func NewSSM(sap SSMSAPRequirements, pduAddress []byte) (SSM, error) { log.Debug().Interface("sap", sap).Bytes("pdu_address", pduAddress).Msg("init") - deviceEntry, err := sap.GetDeviceInventory().getEntryForDestination(pduAddress) - if err != nil { - return SSM{}, errors.Wrap(err, "Can't create SSM") + var deviceInfo *DeviceInfo + deviceInfoTemp, ok := sap.GetDeviceInfoCache().GetDeviceInfo(DeviceInfoCacheKey{PduSource: pduAddress}) + if ok { + deviceInfo = &deviceInfoTemp } localDevice := sap.GetLocalDevice() return SSM{ ssmSAP: sap, pduAddress: pduAddress, - deviceEntry: deviceEntry, + deviceInfo: deviceInfo, state: IDLE, numberOfApduRetries: localDevice.NumberOfAPDURetries, apduTimeout: localDevice.APDUTimeout, @@ -342,7 +343,7 @@ func NewClientSSM(sap SSMSAPRequirements, pduAddress []byte) (*ClientSSM, error) return nil, err } // TODO: if deviceEntry is not there get it now... - if ssm.deviceEntry == nil { + if ssm.deviceInfo == nil { // TODO: get entry for device, store it in inventory log.Debug().Msg("Accquire device information") } @@ -362,7 +363,7 @@ func (s *ClientSSM) setState(newState SSMState, timer *uint) error { if s.state == COMPLETED || s.state == ABORTED { log.Debug().Msg("remove from active transaction") s.ssmSAP.GetClientTransactions() // TODO remove "this" transaction from the list - if s.deviceEntry == nil { + if s.deviceInfo == nil { // TODO: release device entry log.Debug().Msg("release device entry") } @@ -395,13 +396,13 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe // if the max apdu length of the server isn't known, assume that it is the same size as our own and will be the segment // size - if s.deviceEntry == nil || s.deviceEntry.MaximumApduLengthAccepted != nil { + if s.deviceInfo == nil || s.deviceInfo.MaximumApduLengthAccepted != nil { s.segmentSize = uint(s.maxApduLengthAccepted.NumberOfOctets()) - } else if s.deviceEntry.MaximumNpduLength == nil { + } else if s.deviceInfo.MaximumNpduLength == nil { // if the max npdu length of the server isn't known, assume that it is the same as the max apdu length accepted s.segmentSize = uint(s.maxApduLengthAccepted.NumberOfOctets()) } else { - s.segmentSize = utils.Min(*s.deviceEntry.MaximumNpduLength, uint(s.maxApduLengthAccepted.NumberOfOctets())) + s.segmentSize = utils.Min(*s.deviceInfo.MaximumNpduLength, uint(s.maxApduLengthAccepted.NumberOfOctets())) } log.Debug().Msgf("segment size %d", s.segmentSize) @@ -426,9 +427,9 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe return s.Response(abort) } - if s.deviceEntry == nil { + if s.deviceInfo == nil { log.Debug().Msg("no server info for segmentation support") - } else if s.deviceEntry.SegmentationSupported != readWriteModel.BACnetSegmentation_SEGMENTED_TRANSMIT && s.deviceEntry.SegmentationSupported != readWriteModel.BACnetSegmentation_SEGMENTED_BOTH { + } else if *s.deviceInfo.SegmentationSupported != readWriteModel.BACnetSegmentation_SEGMENTED_TRANSMIT && *s.deviceInfo.SegmentationSupported != readWriteModel.BACnetSegmentation_SEGMENTED_BOTH { log.Debug().Msg("server can't receive segmented requests") abort, err := s.abort(readWriteModel.BACnetAbortReason_SEGMENTATION_NOT_SUPPORTED) if err != nil { @@ -438,11 +439,11 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe } // make sure we don't exceed the number of segments in our request that the server said it was willing to accept - if s.deviceEntry == nil { + if s.deviceInfo == nil { log.Debug().Msg("no server info for maximum number of segments") - } else if s.deviceEntry.MaxSegmentsAccepted == nil { + } else if s.deviceInfo.MaxSegmentsAccepted == nil { log.Debug().Msgf("server doesn't say maximum number of segments") - } else if s.segmentCount > s.deviceEntry.MaxSegmentsAccepted.MaxSegments() { + } else if s.segmentCount > s.deviceInfo.MaxSegmentsAccepted.MaxSegments() { log.Debug().Msg("server can't receive enough segments") abort, err := s.abort(readWriteModel.BACnetAbortReason_APDU_TOO_LONG) if err != nil { @@ -909,7 +910,7 @@ func NewServerSSM(sap SSMSAPRequirements, pduAddress []byte) (*ServerSSM, error) return nil, err } // TODO: if deviceEntry is not there get it now... - if &ssm.deviceEntry == nil { + if &ssm.deviceInfo == nil { // TODO: get entry for device, store it in inventory log.Debug().Msg("Accquire device information") } @@ -930,7 +931,7 @@ func (s *ServerSSM) setState(newState SSMState, timer *uint) error { if s.state == COMPLETED || s.state == ABORTED { log.Debug().Msg("remove from active transaction") s.ssmSAP.GetServerTransactions() // TODO remove "this" transaction from the list - if s.deviceEntry != nil { + if s.deviceInfo != nil { // TODO: release device entry log.Debug().Msg("release device entry") } @@ -1020,10 +1021,10 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error { // the segment size is the minimum of the size of the largest packet that can be delivered to the client and the // largest it can accept - if s.deviceEntry == nil || s.deviceEntry.MaximumNpduLength == nil { + if s.deviceInfo == nil || s.deviceInfo.MaximumNpduLength == nil { s.segmentSize = uint(s.maxApduLengthAccepted.NumberOfOctets()) } else { - s.segmentSize = utils.Min(*s.deviceEntry.MaximumNpduLength, uint(s.maxApduLengthAccepted.NumberOfOctets())) + s.segmentSize = utils.Min(*s.deviceInfo.MaximumNpduLength, uint(s.maxApduLengthAccepted.NumberOfOctets())) } // compute the segment count @@ -1158,16 +1159,18 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) error { s.segmentedResponseAccepted = apduConfirmedRequest.GetSegmentedResponseAccepted() // if there is a cache record, check to see if it needs to be updated - if apduConfirmedRequest.GetSegmentedResponseAccepted() && s.deviceEntry != nil { - switch s.deviceEntry.SegmentationSupported { + if apduConfirmedRequest.GetSegmentedResponseAccepted() && s.deviceInfo != nil { + switch *s.deviceInfo.SegmentationSupported { case readWriteModel.BACnetSegmentation_NO_SEGMENTATION: log.Debug().Msg("client actually supports segmented receive") - s.deviceEntry.SegmentationSupported = readWriteModel.BACnetSegmentation_SEGMENTED_RECEIVE + segmentedReceive := readWriteModel.BACnetSegmentation_SEGMENTED_RECEIVE + s.deviceInfo.SegmentationSupported = &segmentedReceive // TODO: bacpypes updates the cache here but as we have a pointer to the entry we should need that. Maybe we should because concurrency... lets see later case readWriteModel.BACnetSegmentation_SEGMENTED_TRANSMIT: log.Debug().Msg("client actually supports both segmented transmit and receive") - s.deviceEntry.SegmentationSupported = readWriteModel.BACnetSegmentation_SEGMENTED_BOTH + segmentedBoth := readWriteModel.BACnetSegmentation_SEGMENTED_BOTH + s.deviceInfo.SegmentationSupported = &segmentedBoth // TODO: bacpypes updates the cache here but as we have a pointer to the entry we should need that. Maybe we should because concurrency... lets see later case readWriteModel.BACnetSegmentation_SEGMENTED_RECEIVE, readWriteModel.BACnetSegmentation_SEGMENTED_BOTH: @@ -1182,11 +1185,11 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) error { // received getMaxApduLengthAccepted := apduConfirmedRequest.GetMaxApduLengthAccepted() s.maxApduLengthAccepted = &getMaxApduLengthAccepted - if s.deviceEntry != nil && s.deviceEntry.MaximumApduLengthAccepted != nil { - if *s.deviceEntry.MaximumApduLengthAccepted < *s.maxApduLengthAccepted { + if s.deviceInfo != nil && s.deviceInfo.MaximumApduLengthAccepted != nil { + if *s.deviceInfo.MaximumApduLengthAccepted < *s.maxApduLengthAccepted { log.Debug().Msg("apdu max reponse encoding error") } else { - s.maxApduLengthAccepted = s.deviceEntry.MaximumApduLengthAccepted + s.maxApduLengthAccepted = s.deviceInfo.MaximumApduLengthAccepted } } log.Debug().Msgf("maxApduLengthAccepted %s", *s.maxApduLengthAccepted) @@ -1459,8 +1462,8 @@ type StateMachineAccessPoint struct { *Client *ServiceAccessPoint - localDevice DeviceEntry - deviceInventory *DeviceInventory + localDevice LocalDeviceObject + deviceInventory *DeviceInfoCache nextInvokeId uint8 clientTransactions []*ClientSSM serverTransactions []*ServerSSM @@ -1475,7 +1478,7 @@ type StateMachineAccessPoint struct { applicationTimeout uint } -func NewStateMachineAccessPoint(localDevice DeviceEntry, deviceInventory *DeviceInventory, sapID *int, cid *int) (*StateMachineAccessPoint, error) { +func NewStateMachineAccessPoint(localDevice LocalDeviceObject, deviceInventory *DeviceInfoCache, sapID *int, cid *int) (*StateMachineAccessPoint, error) { log.Debug().Msgf("NewStateMachineAccessPoint localDevice=%v deviceInventory=%v sap=%v cid=%v", localDevice, deviceInventory, sapID, cid) s := &StateMachineAccessPoint{ @@ -1788,11 +1791,11 @@ func (s *StateMachineAccessPoint) SapConfirmation(apdu readWriteModel.APDU, pduD return nil } -func (s *StateMachineAccessPoint) GetDeviceInventory() *DeviceInventory { +func (s *StateMachineAccessPoint) GetDeviceInfoCache() *DeviceInfoCache { return s.deviceInventory } -func (s *StateMachineAccessPoint) GetLocalDevice() DeviceEntry { +func (s *StateMachineAccessPoint) GetLocalDevice() LocalDeviceObject { return s.localDevice } diff --git a/plc4go/internal/bacnetip/ApplicationModule.go b/plc4go/internal/bacnetip/ApplicationModule.go index 17aba59782..0ae673addd 100644 --- a/plc4go/internal/bacnetip/ApplicationModule.go +++ b/plc4go/internal/bacnetip/ApplicationModule.go @@ -20,10 +20,184 @@ package bacnetip import ( + "bytes" + "encoding/binary" + "fmt" + readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model" "github.com/pkg/errors" "github.com/rs/zerolog/log" + "hash/fnv" ) +type DeviceInfo struct { + DeviceIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier + Address []byte + + MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted + SegmentationSupported *readWriteModel.BACnetSegmentation + MaxSegmentsAccepted *readWriteModel.MaxSegmentsAccepted + VendorId *readWriteModel.BACnetVendorId + MaximumNpduLength *uint + + _refCount int + _cacheKey DeviceInfoCacheKey +} + +func NewDeviceInfo(deviceIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier, address []byte) *DeviceInfo { + return &DeviceInfo{ + DeviceIdentifier: deviceIdentifier, + Address: address, + + MaximumApduLengthAccepted: func() *readWriteModel.MaxApduLengthAccepted { + octets1024 := readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024 + return &octets1024 + }(), + SegmentationSupported: func() *readWriteModel.BACnetSegmentation { + noSegmentation := readWriteModel.BACnetSegmentation_NO_SEGMENTATION + return &noSegmentation + }(), + } +} + +// DeviceInfoCacheKey caches by either Instance, PduSource of both +type DeviceInfoCacheKey struct { + Instance *uint32 + PduSource []byte +} + +func (k DeviceInfoCacheKey) HashKey() uint32 { + h := fnv.New32a() + if k.Instance != nil { + _ = binary.Write(h, binary.BigEndian, *k.Instance) + } + _, _ = h.Write(k.PduSource) + return h.Sum32() +} + +func (k DeviceInfoCacheKey) String() string { + return fmt.Sprintf("key: %d/%x", k.Instance, k.PduSource) +} + +type DeviceInfoCache struct { + cache map[uint32]DeviceInfo +} + +func NewDeviceInfoCache() *DeviceInfoCache { + return &DeviceInfoCache{ + cache: make(map[uint32]DeviceInfo), + } +} + +// HasDeviceInfo Return true if cache has information about the device. +func (i *DeviceInfoCache) HasDeviceInfo(key DeviceInfoCacheKey) bool { + _, ok := i.cache[key.HashKey()] + return ok +} + +// IAmDeviceInfo Create a device information record based on the contents of an IAmRequest and put it in the cache. +func (i *DeviceInfoCache) IAmDeviceInfo(iAm readWriteModel.BACnetUnconfirmedServiceRequestIAm, pduSource []byte) { + log.Debug().Msgf("IAmDeviceInfo\n%s", iAm) + + deviceIdentifier := iAm.GetDeviceIdentifier() + // Get the device instance + deviceInstance := deviceIdentifier.GetInstanceNumber() + + // get the existing cache record if it exists + deviceInfo, ok := i.cache[DeviceInfoCacheKey{&deviceInstance, nil}.HashKey()] + + // maybe there is a record for this address + if !ok { + deviceInfo, ok = i.cache[DeviceInfoCacheKey{nil, pduSource}.HashKey()] + } + + // make a new one using the class provided + if !ok { + deviceInfo = DeviceInfo{ + DeviceIdentifier: deviceIdentifier.GetPayload(), + Address: pduSource, + } + } + + // jam in the correct values + maximumApduLengthAccepted := readWriteModel.MaxApduLengthAccepted(iAm.GetMaximumApduLengthAcceptedLength().GetActualValue()) + deviceInfo.MaximumApduLengthAccepted = &maximumApduLengthAccepted + sementationSupported := iAm.GetSegmentationSupported().GetValue() + deviceInfo.SegmentationSupported = &sementationSupported + vendorId := iAm.GetVendorId().GetValue() + deviceInfo.VendorId = &vendorId + + // tell the cache this is an updated record + i.UpdateDeviceInfo(deviceInfo) +} + +// GetDeviceInfo gets a DeviceInfo from cache +func (i *DeviceInfoCache) GetDeviceInfo(key DeviceInfoCacheKey) (DeviceInfo, bool) { + log.Debug().Msgf("GetDeviceInfo %s", key) + + // get the info if it's there + deviceInfo, ok := i.cache[key.HashKey()] + log.Debug().Msgf("deviceInfo: %#v", deviceInfo) + + return deviceInfo, ok +} + +// UpdateDeviceInfo The application has updated one or more fields in the device information record and the cache needs +// to be updated to reflect the changes. If this is a cached version of a persistent record then this is the +// opportunity to update the database. +func (i *DeviceInfoCache) UpdateDeviceInfo(deviceInfo DeviceInfo) { + log.Debug().Msgf("UpdateDeviceInfo %#v", deviceInfo) + + // get the current key + cacheKey := deviceInfo._cacheKey + if cacheKey.Instance != nil && deviceInfo.DeviceIdentifier.GetInstanceNumber() != *cacheKey.Instance { + instanceNumber := deviceInfo.DeviceIdentifier.GetInstanceNumber() + cacheKey.Instance = &instanceNumber + delete(i.cache, cacheKey.HashKey()) + i.cache[DeviceInfoCacheKey{Instance: &instanceNumber}.HashKey()] = deviceInfo + } + if bytes.Compare(deviceInfo.Address, cacheKey.PduSource) != 0 { + cacheKey.PduSource = deviceInfo.Address + delete(i.cache, cacheKey.HashKey()) + i.cache[DeviceInfoCacheKey{PduSource: cacheKey.PduSource}.HashKey()] = deviceInfo + } + + // update the key + instanceNumber := deviceInfo.DeviceIdentifier.GetInstanceNumber() + deviceInfo._cacheKey = DeviceInfoCacheKey{ + Instance: &instanceNumber, + PduSource: deviceInfo.Address, + } + i.cache[deviceInfo._cacheKey.HashKey()] = deviceInfo +} + +// Acquire Return the known information about the device and mark the record as being used by a segmentation state +// machine. +func (i *DeviceInfoCache) Acquire(key DeviceInfoCacheKey) (DeviceInfo, bool) { + log.Debug().Msgf("Acquire %#v", key) + + deviceInfo, ok := i.cache[key.HashKey()] + if ok { + deviceInfo._refCount++ + i.cache[key.HashKey()] = deviceInfo + } + + return deviceInfo, ok +} + +// Release This function is called by the segmentation state machine when it has finished with the device information. +func (i *DeviceInfoCache) Release(deviceInfo DeviceInfo) error { + + //this information record might be used by more than one SSM + if deviceInfo._refCount == 0 { + return errors.New("reference count") + } + + // decrement the reference count + deviceInfo._refCount-- + i.cache[deviceInfo._cacheKey.HashKey()] = deviceInfo + return nil +} + // TODO: implement type Application struct { ApplicationServiceElement @@ -58,7 +232,7 @@ type BIPSimpleApplication struct { mux *UDPMultiplexer } -func NewBIPSimpleApplication(localDevice DeviceEntry, localAddress, deviceInfoCache *DeviceInventory, aseID *int) (*BIPSimpleApplication, error) { +func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) { b := &BIPSimpleApplication{} var err error b.ApplicationIOController, err = NewApplicationIOController(localDevice, localAddress, deviceInfoCache, aseID) diff --git a/plc4go/internal/bacnetip/Device.go b/plc4go/internal/bacnetip/Device.go index cbf38a66aa..cf0e825ddd 100644 --- a/plc4go/internal/bacnetip/Device.go +++ b/plc4go/internal/bacnetip/Device.go @@ -19,6 +19,8 @@ package bacnetip +import readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model" + type WhoIsIAmServices struct { } @@ -26,3 +28,23 @@ func NewWhoIsIAmServices() (*WhoIsIAmServices, error) { // TODO: implement me return nil, nil } + +type LocalDeviceObject struct { + NumberOfAPDURetries uint + APDUTimeout uint + SegmentationSupported readWriteModel.BACnetSegmentation + APDUSegmentTimeout uint + MaxSegmentsAccepted *readWriteModel.MaxSegmentsAccepted + MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted +} + +func NewLocalDeviceObject() *LocalDeviceObject { + return &LocalDeviceObject{ + NumberOfAPDURetries: 0, + APDUTimeout: 0, + SegmentationSupported: 0, + APDUSegmentTimeout: 0, + MaxSegmentsAccepted: nil, + MaximumApduLengthAccepted: nil, + } +} diff --git a/plc4go/internal/bacnetip/DeviceInventory.go b/plc4go/internal/bacnetip/DeviceInventory.go index 350b3c0393..86f3f5c690 100644 --- a/plc4go/internal/bacnetip/DeviceInventory.go +++ b/plc4go/internal/bacnetip/DeviceInventory.go @@ -26,6 +26,7 @@ import ( "time" ) +// TODO: migrate into device info cache type DeviceInventory struct { sync.RWMutex devices map[string]DeviceEntry