This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 35ff4b5b6803b4eeb449af3169c4bcdbfbbf2add Author: Sebastian Rühl <[email protected]> AuthorDate: Wed Jul 7 17:32:19 2021 +0200 plc4go: initial bacnet draft --- plc4go/internal/plc4go/bacnetip/Connection.go | 77 ++++++++++++++++ plc4go/internal/plc4go/bacnetip/Driver.go | 81 +++++++++++++++- plc4go/internal/plc4go/bacnetip/Field.go | 92 +++++++++++++++++++ plc4go/internal/plc4go/bacnetip/FieldHandler.go | 64 +++++++++++++ plc4go/internal/plc4go/bacnetip/MessageCodec.go | 102 +++++++++++++++++++++ plc4go/internal/plc4go/bacnetip/Subscriber.go | 70 ++++++++++++++ .../plc4go/bacnetip/{Driver.go => ValueHandler.go} | 12 ++- 7 files changed, 493 insertions(+), 5 deletions(-) diff --git a/plc4go/internal/plc4go/bacnetip/Connection.go b/plc4go/internal/plc4go/bacnetip/Connection.go new file mode 100644 index 0000000..ace2a38 --- /dev/null +++ b/plc4go/internal/plc4go/bacnetip/Connection.go @@ -0,0 +1,77 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package bacnetip + +import ( + "fmt" + "github.com/apache/plc4x/plc4go/internal/plc4go/spi" + "github.com/apache/plc4x/plc4go/internal/plc4go/spi/default" + internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model" + "github.com/apache/plc4x/plc4go/pkg/plc4go" + "github.com/apache/plc4x/plc4go/pkg/plc4go/model" + "github.com/rs/zerolog/log" +) + +type Connection struct { + _default.DefaultConnection + messageCodec spi.MessageCodec + subscribers []*Subscriber +} + +func NewConnection(messageCodec spi.MessageCodec, fieldHandler spi.PlcFieldHandler) *Connection { + connection := &Connection{ + messageCodec: messageCodec, + } + connection.DefaultConnection = _default.NewDefaultConnection(connection, + _default.WithPlcFieldHandler(fieldHandler), + _default.WithPlcValueHandler(NewValueHandler()), + ) + return connection +} + +func (c *Connection) GetConnection() plc4go.PlcConnection { + return c +} + +func (c *Connection) GetMessageCodec() spi.MessageCodec { + return c.messageCodec +} + +func (c *Connection) SubscriptionRequestBuilder() model.PlcSubscriptionRequestBuilder { + return internalModel.NewDefaultPlcSubscriptionRequestBuilder(c.GetPlcFieldHandler(), c.GetPlcValueHandler(), NewSubscriber(c)) +} + +func (c *Connection) UnsubscriptionRequestBuilder() model.PlcUnsubscriptionRequestBuilder { + panic("Not implementec yet. (at least as a default)") +} + +func (m *Connection) addSubscriber(subscriber *Subscriber) { + for _, sub := range m.subscribers { + if sub == subscriber { + log.Debug().Msgf("Subscriber %v already added", subscriber) + return + } + } + m.subscribers = append(m.subscribers, subscriber) +} + +func (c *Connection) String() string { + return fmt.Sprintf("bacnetip.Connection") +} diff --git a/plc4go/internal/plc4go/bacnetip/Driver.go b/plc4go/internal/plc4go/bacnetip/Driver.go index 3620e61..635b212 100644 --- a/plc4go/internal/plc4go/bacnetip/Driver.go +++ b/plc4go/internal/plc4go/bacnetip/Driver.go @@ -19,8 +19,85 @@ package bacnetip -import "github.com/apache/plc4x/plc4go/pkg/plc4go" +import ( + "github.com/apache/plc4x/plc4go/internal/plc4go/spi" + "github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports" + "github.com/apache/plc4x/plc4go/pkg/plc4go" + apiModel "github.com/apache/plc4x/plc4go/pkg/plc4go/model" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + "net/url" +) + +type Driver struct { + fieldHandler spi.PlcFieldHandler + awaitSetupComplete bool + awaitDisconnectComplete bool +} func NewDriver() plc4go.PlcDriver { - return nil + return &Driver{ + fieldHandler: NewFieldHandler(), + awaitSetupComplete: true, + awaitDisconnectComplete: true, + } +} + +func (m *Driver) GetProtocolCode() string { + return "bacnet-ip" +} + +func (m *Driver) GetProtocolName() string { + return "BACnet/IP" +} + +func (m *Driver) GetDefaultTransport() string { + return "udp" +} + +func (m *Driver) CheckQuery(query string) error { + _, err := m.fieldHandler.ParseQuery(query) + return err +} + +func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult { + log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(options)) + // Get an the transport specified in the url + transport, ok := transports[transportUrl.Scheme] + if !ok { + log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme) + ch := make(chan plc4go.PlcConnectionConnectResult) + go func() { + ch <- plc4go.NewPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl)) + }() + return ch + } + // Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string. + options["defaultUdpPort"] = []string{"47808"} + // Have the transport create a new transport-instance. + transportInstance, err := transport.CreateTransportInstance(transportUrl, options) + if err != nil { + log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultUdpPort"]) + ch := make(chan plc4go.PlcConnectionConnectResult) + go func() { + ch <- plc4go.NewPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String())) + }() + return ch + } + + codec := NewMessageCodec(transportInstance) + log.Debug().Msgf("working with codec %#v", codec) + + // Create the new connection + connection := NewConnection(codec, m.fieldHandler) + log.Debug().Msg("created connection, connecting now") + return connection.Connect() +} + +func (m *Driver) SupportsDiscovery() bool { + return false +} + +func (m *Driver) Discover(callback func(event apiModel.PlcDiscoveryEvent)) error { + panic("implement me") } diff --git a/plc4go/internal/plc4go/bacnetip/Field.go b/plc4go/internal/plc4go/bacnetip/Field.go new file mode 100644 index 0000000..fe9a4bd --- /dev/null +++ b/plc4go/internal/plc4go/bacnetip/Field.go @@ -0,0 +1,92 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package bacnetip + +import ( + "github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils" + "strconv" +) + +type BacNetPlcField interface { + GetDeviceIdentifier() uint32 + GetObjectType() uint16 + GetObjectInstance() uint32 +} + +type PlcField struct { + DeviceIdentifier uint32 + ObjectType uint16 + ObjectInstance uint32 +} + +func (m PlcField) GetAddressString() string { + return strconv.Itoa(int(m.DeviceIdentifier)) +} + +func (m PlcField) GetTypeName() string { + return strconv.Itoa(int(m.ObjectType)) +} + +func (m PlcField) GetQuantity() uint16 { + return 1 +} + +func NewField(deviceIdentifier uint32, objectType uint16, objectInstance uint32) PlcField { + return PlcField{ + DeviceIdentifier: deviceIdentifier, + ObjectType: objectType, + ObjectInstance: objectInstance, + } +} + +func (m PlcField) GetDeviceIdentifier() uint32 { + return m.DeviceIdentifier +} + +func (m PlcField) GetObjectType() uint16 { + return m.ObjectType +} + +func (m PlcField) GetObjectInstance() uint32 { + return m.ObjectInstance +} + +func (m PlcField) Serialize(writeBuffer utils.WriteBuffer) error { + if err := writeBuffer.PushContext("BacNetPlcField"); err != nil { + return err + } + + if err := writeBuffer.WriteUint32("deviceIdentifier", 32, m.DeviceIdentifier); err != nil { + return err + } + + if err := writeBuffer.WriteUint16("objectType", 16, m.ObjectType); err != nil { + return err + } + + if err := writeBuffer.WriteUint32("objectInstance", 32, m.ObjectInstance); err != nil { + return err + } + + if err := writeBuffer.PopContext("BacNetPlcField"); err != nil { + return err + } + return nil +} diff --git a/plc4go/internal/plc4go/bacnetip/FieldHandler.go b/plc4go/internal/plc4go/bacnetip/FieldHandler.go new file mode 100644 index 0000000..2a32db4 --- /dev/null +++ b/plc4go/internal/plc4go/bacnetip/FieldHandler.go @@ -0,0 +1,64 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package bacnetip + +import ( + "github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils" + "github.com/apache/plc4x/plc4go/pkg/plc4go/model" + "github.com/pkg/errors" + "regexp" + "strconv" +) + +type FieldHandler struct { + addressPattern *regexp.Regexp +} + +func NewFieldHandler() FieldHandler { + return FieldHandler{ + addressPattern: regexp.MustCompile(`^(?P<deviceIdentifier>(\d|\*))/(?P<objectType>(\d|\*))/(?P<objectInstance>(\d|\*))`), + } +} + +const ( + DEVICE_IDENTIFIER = "deviceIdentifier" + OBJECT_TYPE = "objectType" + OBJECT_INSTANCE = "objectInstance" +) + +func (m FieldHandler) ParseQuery(query string) (model.PlcField, error) { + if match := utils.GetSubgroupMatches(m.addressPattern, query); match != nil { + deviceIdentifier, err := strconv.ParseUint(match[DEVICE_IDENTIFIER], 10, 32) + if err != nil { + return nil, err + } + objectType, err := strconv.ParseUint(match[OBJECT_TYPE], 10, 16) + if err != nil { + return nil, err + } + objectInstance, err := strconv.ParseUint(match[OBJECT_INSTANCE], 10, 32) + if err != nil { + return nil, err + } + + return NewField(uint32(deviceIdentifier), uint16(objectType), uint32(objectInstance)), nil + } + return nil, errors.Errorf("Unable to parse %s", query) +} diff --git a/plc4go/internal/plc4go/bacnetip/MessageCodec.go b/plc4go/internal/plc4go/bacnetip/MessageCodec.go new file mode 100644 index 0000000..1344ca4 --- /dev/null +++ b/plc4go/internal/plc4go/bacnetip/MessageCodec.go @@ -0,0 +1,102 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package bacnetip + +import ( + "github.com/apache/plc4x/plc4go/internal/plc4go/bacnetip/readwrite/model" + "github.com/apache/plc4x/plc4go/internal/plc4go/spi" + "github.com/apache/plc4x/plc4go/internal/plc4go/spi/default" + "github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports" + "github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" +) + +type MessageCodec struct { + _default.DefaultCodec +} + +func NewMessageCodec(transportInstance transports.TransportInstance) *MessageCodec { + codec := &MessageCodec{} + codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance) + return codec +} + +func (m *MessageCodec) GetCodec() spi.MessageCodec { + return m +} + +func (m *MessageCodec) Send(message interface{}) error { + log.Trace().Msg("Sending message") + // Cast the message to the correct type of struct + bvlcPacket := model.CastBVLC(message) + // Serialize the request + wb := utils.NewWriteBufferByteBased() + err := bvlcPacket.Serialize(wb) + if err != nil { + return errors.Wrap(err, "error serializing request") + } + + // Send it to the PLC + err = m.GetTransportInstance().Write(wb.GetBytes()) + if err != nil { + return errors.Wrap(err, "error sending request") + } + return nil +} + +func (m *MessageCodec) Receive() (interface{}, error) { + log.Trace().Msg("receiving") + // We need at least 6 bytes in order to know how big the packet is in total + if num, err := m.GetTransportInstance().GetNumReadableBytes(); (err == nil) && (num >= 4) { + log.Debug().Msgf("we got %d readable bytes", num) + data, err := m.GetTransportInstance().PeekReadableBytes(4) + if err != nil { + log.Warn().Err(err).Msg("error peeking") + // TODO: Possibly clean up ... + return nil, nil + } + //Second byte for the size and then add the header size 24 + packetSize := uint32((uint16(data[3]) << 8) + uint16(data[2])) + if num < packetSize { + log.Debug().Msgf("Not enough bytes. Got: %d Need: %d\n", num, packetSize) + return nil, nil + } + data, err = m.GetTransportInstance().Read(packetSize) + if err != nil { + log.Debug().Err(err).Msg("Error reading") + // TODO: Possibly clean up ... + return nil, nil + } + rb := utils.NewReadBufferByteBased(data) + bvlcPacket, err := model.BVLCParse(rb) + if err != nil { + log.Warn().Err(err).Msg("error parsing") + // TODO: Possibly clean up ... + return nil, nil + } + return bvlcPacket, nil + } else if err != nil { + log.Warn().Err(err).Msg("Got error reading") + return nil, nil + } + // TODO: maybe we return here a not enough error error + return nil, nil +} diff --git a/plc4go/internal/plc4go/bacnetip/Subscriber.go b/plc4go/internal/plc4go/bacnetip/Subscriber.go new file mode 100644 index 0000000..8e9d217 --- /dev/null +++ b/plc4go/internal/plc4go/bacnetip/Subscriber.go @@ -0,0 +1,70 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package bacnetip + +import ( + internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model" + apiModel "github.com/apache/plc4x/plc4go/pkg/plc4go/model" +) + +type Subscriber struct { + connection *Connection + subscriptionRequests []internalModel.DefaultPlcSubscriptionRequest +} + +func NewSubscriber(connection *Connection) *Subscriber { + return &Subscriber{ + connection: connection, + subscriptionRequests: []internalModel.DefaultPlcSubscriptionRequest{}, + } +} + +func (m *Subscriber) Subscribe(subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult { + result := make(chan apiModel.PlcSubscriptionRequestResult) + go func() { + // Add this subscriber to the connection. + m.connection.addSubscriber(m) + + // Save the subscription request + m.subscriptionRequests = append(m.subscriptionRequests, subscriptionRequest.(internalModel.DefaultPlcSubscriptionRequest)) + + // Just populate all requests with an OK + responseCodes := map[string]apiModel.PlcResponseCode{} + for _, fieldName := range subscriptionRequest.GetFieldNames() { + responseCodes[fieldName] = apiModel.PlcResponseCode_OK + } + + result <- apiModel.PlcSubscriptionRequestResult{ + Request: subscriptionRequest, + Response: internalModel.NewDefaultPlcSubscriptionResponse(subscriptionRequest, responseCodes), + Err: nil, + } + }() + return result +} + +func (m *Subscriber) Unsubscribe(unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult { + result := make(chan apiModel.PlcUnsubscriptionRequestResult) + + // TODO: As soon as we establish a connection, we start getting data... + // subscriptions are more an internal handling of which values to pass where. + + return result +} diff --git a/plc4go/internal/plc4go/bacnetip/Driver.go b/plc4go/internal/plc4go/bacnetip/ValueHandler.go similarity index 81% copy from plc4go/internal/plc4go/bacnetip/Driver.go copy to plc4go/internal/plc4go/bacnetip/ValueHandler.go index 3620e61..982bc53 100644 --- a/plc4go/internal/plc4go/bacnetip/Driver.go +++ b/plc4go/internal/plc4go/bacnetip/ValueHandler.go @@ -19,8 +19,14 @@ package bacnetip -import "github.com/apache/plc4x/plc4go/pkg/plc4go" +import ( + "github.com/apache/plc4x/plc4go/internal/plc4go/spi/values" +) -func NewDriver() plc4go.PlcDriver { - return nil +type ValueHandler struct { + values.IEC61131ValueHandler +} + +func NewValueHandler() ValueHandler { + return ValueHandler{} }
