This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit f3454c92acd28ce2ac48e5b3cf3f91089be06359 Author: Sebastian Rühl <[email protected]> AuthorDate: Fri Apr 16 15:32:40 2021 +0200 plc4go: introduced a DefaultConnection which can be use for most PlcConnection implementations + introduced a _default.DefaultConnectionMetadata + refactored ads/modbus/s7 to use new DefaultConnection --- plc4go/internal/plc4go/ads/Connection.go | 100 ++------ plc4go/internal/plc4go/modbus/Connection.go | 131 +++------- plc4go/internal/plc4go/s7/Connection.go | 129 +++------- plc4go/internal/plc4go/s7/Driver.go | 2 +- .../plc4go/spi/default/DefaultConnection.go | 267 +++++++++++++++++++++ plc4go/internal/plc4go/spi/default/init.go | 21 ++ 6 files changed, 370 insertions(+), 280 deletions(-) diff --git a/plc4go/internal/plc4go/ads/Connection.go b/plc4go/internal/plc4go/ads/Connection.go index 3061dd0..5f97150 100644 --- a/plc4go/internal/plc4go/ads/Connection.go +++ b/plc4go/internal/plc4go/ads/Connection.go @@ -22,43 +22,17 @@ package ads import ( "fmt" "github.com/apache/plc4x/plc4go/internal/plc4go/spi" + "github.com/apache/plc4x/plc4go/internal/plc4go/spi/default" "github.com/apache/plc4x/plc4go/internal/plc4go/spi/interceptors" internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model" "github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports" "github.com/apache/plc4x/plc4go/pkg/plc4go" apiModel "github.com/apache/plc4x/plc4go/pkg/plc4go/model" - "github.com/rs/zerolog/log" - "time" ) -type ConnectionMetadata struct { -} - -func (m *ConnectionMetadata) GetConnectionAttributes() map[string]string { - return map[string]string{} -} - -func (m *ConnectionMetadata) CanRead() bool { - return true -} - -func (m *ConnectionMetadata) CanWrite() bool { - return true -} - -func (m *ConnectionMetadata) CanSubscribe() bool { - return true -} - -func (m *ConnectionMetadata) CanBrowse() bool { - return false -} - -// TODO: maybe we can use a DefaultConnection struct here with delegates type Connection struct { + _default.DefaultConnection messageCodec spi.MessageCodec - fieldHandler spi.PlcFieldHandler - valueHandler spi.PlcValueHandler requestInterceptor internalModel.RequestInterceptor configuration Configuration reader *Reader @@ -81,65 +55,41 @@ func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, f configuration.sourceAmsPort, &reader, ) - return &Connection{ + connection := &Connection{ messageCodec: messageCodec, - fieldHandler: fieldHandler, - valueHandler: NewValueHandler(), requestInterceptor: interceptors.NewSingleItemRequestInterceptor(), reader: &reader, writer: &writer, - }, nil -} - -func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult { - log.Trace().Msg("Connecting") - ch := make(chan plc4go.PlcConnectionConnectResult) - go func() { - err := m.messageCodec.Connect() - ch <- plc4go.NewPlcConnectionConnectResult(m, err) - }() - return ch -} - -func (m *Connection) BlockingClose() { - log.Trace().Msg("Closing blocked") - closeResults := m.Close() - select { - case <-closeResults: - return - case <-time.After(time.Second * 5): - return } + connection.DefaultConnection = _default.NewDefaultConnection(connection, + _default.WithPlcFieldHandler(fieldHandler), + _default.WithPlcValueHandler(NewValueHandler()), + ) + return connection, nil } -func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult { - log.Trace().Msg("Close") - // TODO: Implement ... - ch := make(chan plc4go.PlcConnectionCloseResult) - go func() { - ch <- plc4go.NewPlcConnectionCloseResult(m, nil) - }() - return ch +func (m *Connection) GetConnection() plc4go.PlcConnection { + return m } -func (m *Connection) IsConnected() bool { - panic("implement me") -} - -func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult { - panic("implement me") +func (m *Connection) GetMessageCodec() spi.MessageCodec { + return m.messageCodec } func (m *Connection) GetMetadata() apiModel.PlcConnectionMetadata { - return &ConnectionMetadata{} + return _default.DefaultConnectionMetadata{ + ProvidesReading: true, + ProvidesWriting: true, + ProvidesSubscribing: true, + } } func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder { - return internalModel.NewDefaultPlcReadRequestBuilder(m.fieldHandler, m.reader) + return internalModel.NewDefaultPlcReadRequestBuilder(m.GetPlcFieldHandler(), m.reader) } func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder { - return internalModel.NewDefaultPlcWriteRequestBuilder(m.fieldHandler, m.valueHandler, m.writer) + return internalModel.NewDefaultPlcWriteRequestBuilder(m.GetPlcFieldHandler(), m.GetPlcValueHandler(), m.writer) } func (m *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder { @@ -150,10 +100,6 @@ func (m *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRe panic("implement me") } -func (m *Connection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder { - panic("implement me") -} - func (m *Connection) GetTransportInstance() transports.TransportInstance { if mc, ok := m.messageCodec.(spi.TransportInstanceExposer); ok { return mc.GetTransportInstance() @@ -161,14 +107,6 @@ func (m *Connection) GetTransportInstance() transports.TransportInstance { return nil } -func (m *Connection) GetPlcFieldHandler() spi.PlcFieldHandler { - return m.fieldHandler -} - -func (m *Connection) GetPlcValueHandler() spi.PlcValueHandler { - return m.valueHandler -} - func (m *Connection) String() string { return fmt.Sprintf("ads.Connection{}") } diff --git a/plc4go/internal/plc4go/modbus/Connection.go b/plc4go/internal/plc4go/modbus/Connection.go index c781276..a52aab8 100644 --- a/plc4go/internal/plc4go/modbus/Connection.go +++ b/plc4go/internal/plc4go/modbus/Connection.go @@ -23,9 +23,9 @@ import ( "fmt" readWriteModel "github.com/apache/plc4x/plc4go/internal/plc4go/modbus/readwrite/model" "github.com/apache/plc4x/plc4go/internal/plc4go/spi" + "github.com/apache/plc4x/plc4go/internal/plc4go/spi/default" "github.com/apache/plc4x/plc4go/internal/plc4go/spi/interceptors" internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model" - "github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports" "github.com/apache/plc4x/plc4go/pkg/plc4go" apiModel "github.com/apache/plc4x/plc4go/pkg/plc4go/model" "github.com/pkg/errors" @@ -33,86 +33,38 @@ import ( "time" ) -type ConnectionMetadata struct { -} - -func (m ConnectionMetadata) GetConnectionAttributes() map[string]string { - return map[string]string{} -} - -func (m ConnectionMetadata) CanRead() bool { - return true -} - -func (m ConnectionMetadata) CanWrite() bool { - return true -} - -func (m ConnectionMetadata) CanSubscribe() bool { - return false -} - -func (m ConnectionMetadata) CanBrowse() bool { - return false -} - -// TODO: maybe we can use a DefaultConnection struct here with delegates type Connection struct { + _default.DefaultConnection unitIdentifier uint8 messageCodec spi.MessageCodec options map[string][]string - fieldHandler spi.PlcFieldHandler - valueHandler spi.PlcValueHandler requestInterceptor internalModel.RequestInterceptor } -func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, options map[string][]string, fieldHandler spi.PlcFieldHandler) Connection { - return Connection{ +func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, options map[string][]string, fieldHandler spi.PlcFieldHandler) *Connection { + connection := &Connection{ unitIdentifier: unitIdentifier, messageCodec: messageCodec, options: options, - fieldHandler: fieldHandler, - valueHandler: NewValueHandler(), requestInterceptor: interceptors.NewSingleItemRequestInterceptor(), } + connection.DefaultConnection = _default.NewDefaultConnection(connection, + _default.WithDefaultTtl(time.Second*5), + _default.WithPlcFieldHandler(fieldHandler), + _default.WithPlcValueHandler(NewValueHandler()), + ) + return connection } -func (m Connection) Connect() <-chan plc4go.PlcConnectionConnectResult { - log.Trace().Msg("Connecting") - ch := make(chan plc4go.PlcConnectionConnectResult) - go func() { - err := m.messageCodec.Connect() - ch <- plc4go.NewPlcConnectionConnectResult(m, err) - }() - return ch +func (m *Connection) GetConnection() plc4go.PlcConnection { + return m } -func (m Connection) BlockingClose() { - log.Trace().Msg("Closing blocked") - closeResults := m.Close() - select { - case <-closeResults: - return - case <-time.After(time.Second * 5): - return - } +func (m *Connection) GetMessageCodec() spi.MessageCodec { + return m.messageCodec } -func (m Connection) Close() <-chan plc4go.PlcConnectionCloseResult { - log.Trace().Msg("Close") - // TODO: Implement ... - ch := make(chan plc4go.PlcConnectionCloseResult) - go func() { - ch <- plc4go.NewPlcConnectionCloseResult(m, nil) - }() - return ch -} - -func (m Connection) IsConnected() bool { - panic("implement me") -} - -func (m Connection) Ping() <-chan plc4go.PlcConnectionPingResult { +func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult { log.Trace().Msg("Pinging") result := make(chan plc4go.PlcConnectionPingResult) go func() { @@ -148,47 +100,30 @@ func (m Connection) Ping() <-chan plc4go.PlcConnectionPingResult { return result } -func (m Connection) GetMetadata() apiModel.PlcConnectionMetadata { - return ConnectionMetadata{} -} - -func (m Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder { - return internalModel.NewDefaultPlcReadRequestBuilderWithInterceptor(m.fieldHandler, - NewReader(m.unitIdentifier, m.messageCodec), m.requestInterceptor) -} - -func (m Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder { - return internalModel.NewDefaultPlcWriteRequestBuilder( - m.fieldHandler, m.valueHandler, NewWriter(m.unitIdentifier, m.messageCodec)) -} - -func (m Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder { - panic("implement me") -} - -func (m Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder { - panic("implement me") -} - -func (m Connection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder { - panic("implement me") -} - -func (m Connection) GetTransportInstance() transports.TransportInstance { - if mc, ok := m.messageCodec.(spi.TransportInstanceExposer); ok { - return mc.GetTransportInstance() +func (m *Connection) GetMetadata() apiModel.PlcConnectionMetadata { + return _default.DefaultConnectionMetadata{ + ProvidesReading: true, + ProvidesWriting: true, } - return nil } -func (m Connection) GetPlcFieldHandler() spi.PlcFieldHandler { - return m.fieldHandler +func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder { + return internalModel.NewDefaultPlcReadRequestBuilderWithInterceptor( + m.GetPlcFieldHandler(), + NewReader(m.unitIdentifier, m.messageCodec), + m.requestInterceptor, + ) } -func (m Connection) GetPlcValueHandler() spi.PlcValueHandler { - return m.valueHandler +func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder { + // TODO: don't we need a interceptor here? + return internalModel.NewDefaultPlcWriteRequestBuilder( + m.GetPlcFieldHandler(), + m.GetPlcValueHandler(), + NewWriter(m.unitIdentifier, m.messageCodec), + ) } -func (m Connection) String() string { +func (m *Connection) String() string { return fmt.Sprintf("modbus.Connection{unitIdentifier: %d}", m.unitIdentifier) } diff --git a/plc4go/internal/plc4go/s7/Connection.go b/plc4go/internal/plc4go/s7/Connection.go index f6d47c3..9ca0f20 100644 --- a/plc4go/internal/plc4go/s7/Connection.go +++ b/plc4go/internal/plc4go/s7/Connection.go @@ -23,9 +23,9 @@ import ( "fmt" readWriteModel "github.com/apache/plc4x/plc4go/internal/plc4go/s7/readwrite/model" "github.com/apache/plc4x/plc4go/internal/plc4go/spi" + "github.com/apache/plc4x/plc4go/internal/plc4go/spi/default" internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model" "github.com/apache/plc4x/plc4go/internal/plc4go/spi/plcerrors" - "github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports" "github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils" "github.com/apache/plc4x/plc4go/pkg/plc4go" apiModel "github.com/apache/plc4x/plc4go/pkg/plc4go/model" @@ -34,32 +34,8 @@ import ( "reflect" "strings" "sync" - "time" ) -type ConnectionMetadata struct { -} - -func (m ConnectionMetadata) GetConnectionAttributes() map[string]string { - return map[string]string{} -} - -func (m ConnectionMetadata) CanRead() bool { - return true -} - -func (m ConnectionMetadata) CanWrite() bool { - return true -} - -func (m ConnectionMetadata) CanSubscribe() bool { - return false -} - -func (m ConnectionMetadata) CanBrowse() bool { - return false -} - type TpduGenerator struct { currentTpduId uint16 lock sync.Mutex @@ -77,30 +53,36 @@ func (t *TpduGenerator) getAndIncrement() uint16 { return result } -// TODO: maybe we can use a DefaultConnection struct here with delegates type Connection struct { + _default.DefaultConnection tpduGenerator TpduGenerator messageCodec spi.MessageCodec configuration Configuration driverContext DriverContext - fieldHandler spi.PlcFieldHandler - valueHandler spi.PlcValueHandler - defaultTtl time.Duration tm *spi.RequestTransactionManager - connected bool } -func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, driverContext DriverContext, fieldHandler spi.PlcFieldHandler, tm *spi.RequestTransactionManager) Connection { - return Connection{ +func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, driverContext DriverContext, fieldHandler spi.PlcFieldHandler, tm *spi.RequestTransactionManager) *Connection { + connection := &Connection{ tpduGenerator: TpduGenerator{currentTpduId: 10}, messageCodec: messageCodec, configuration: configuration, driverContext: driverContext, - fieldHandler: fieldHandler, - valueHandler: NewValueHandler(), - defaultTtl: time.Second * 10, tm: tm, } + connection.DefaultConnection = _default.NewDefaultConnection(connection, + _default.WithPlcFieldHandler(fieldHandler), + _default.WithPlcValueHandler(NewValueHandler()), + ) + return connection +} + +func (m *Connection) GetConnection() plc4go.PlcConnection { + return m +} + +func (m *Connection) GetMessageCodec() spi.MessageCodec { + return m.messageCodec } func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult { @@ -124,8 +106,9 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult { go m.setupConnection(ch) log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!") // Here we write directly and don't wait till the connection is "really" connected + // Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete ch <- plc4go.NewPlcConnectionConnectResult(m, err) - m.connected = true + m.SetConnected(true) return } @@ -167,7 +150,7 @@ func (m *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult) cotpConnectionErrorChan <- errors.Wrap(err, "got error processing request") return nil }, - m.defaultTtl, + m.GetTtl(), ); err != nil { m.fireConnectionError(errors.Wrap(err, "Error during sending of COTP Connection Request"), ch) } @@ -214,7 +197,7 @@ func (m *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult) s7ConnectionErrorChan <- errors.Wrap(err, "got error processing request") return nil }, - m.defaultTtl, + m.GetTtl(), ); err != nil { m.fireConnectionError(errors.Wrap(err, "Error during sending of S7 Connection Request"), ch) } @@ -279,7 +262,7 @@ func (m *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult) s7IdentificationErrorChan <- errors.Wrap(err, "got error processing request") return nil }, - m.defaultTtl, + m.GetTtl(), ); err != nil { m.fireConnectionError(errors.Wrap(err, "Error during sending of identify remote Request"), ch) } @@ -312,7 +295,7 @@ func (m *Connection) fireConnected(ch chan<- plc4go.PlcConnectionConnectResult) } else { log.Info().Msg("Successfully connected") } - m.connected = true + m.SetConnected(true) } func (m *Connection) extractControllerTypeAndFireConnected(payloadUserData *readWriteModel.S7PayloadUserData, ch chan<- plc4go.PlcConnectionConnectResult) { @@ -434,74 +417,20 @@ func (m *Connection) createCOTPConnectionRequest() *readWriteModel.COTPPacket { ) } -func (m *Connection) BlockingClose() { - log.Trace().Msg("Closing blocked") - closeResults := m.Close() - select { - case <-closeResults: - return - case <-time.After(time.Second * 5): - return - } -} - -func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult { - log.Trace().Msg("Close") - m.connected = false - // TODO: Implement ... - ch := make(chan plc4go.PlcConnectionCloseResult) - go func() { - ch <- plc4go.NewPlcConnectionCloseResult(m, nil) - }() - return ch -} - -func (m *Connection) IsConnected() bool { - return m.connected -} - -func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult { - panic("Not implemented") -} - func (m *Connection) GetMetadata() apiModel.PlcConnectionMetadata { - return ConnectionMetadata{} + return _default.DefaultConnectionMetadata{ + ProvidesReading: true, + ProvidesWriting: true, + } } func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder { - return internalModel.NewDefaultPlcReadRequestBuilder(m.fieldHandler, NewReader(&m.tpduGenerator, m.messageCodec, m.tm)) + return internalModel.NewDefaultPlcReadRequestBuilder(m.GetPlcFieldHandler(), NewReader(&m.tpduGenerator, m.messageCodec, m.tm)) } func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder { return internalModel.NewDefaultPlcWriteRequestBuilder( - m.fieldHandler, m.valueHandler, NewWriter(&m.tpduGenerator, m.messageCodec, m.tm)) -} - -func (m *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder { - panic("implement me") -} - -func (m *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder { - panic("implement me") -} - -func (m *Connection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder { - panic("implement me") -} - -func (m *Connection) GetTransportInstance() transports.TransportInstance { - if mc, ok := m.messageCodec.(spi.TransportInstanceExposer); ok { - return mc.GetTransportInstance() - } - return nil -} - -func (m *Connection) GetPlcFieldHandler() spi.PlcFieldHandler { - return m.fieldHandler -} - -func (m *Connection) GetPlcValueHandler() spi.PlcValueHandler { - return m.valueHandler + m.GetPlcFieldHandler(), m.GetPlcValueHandler(), NewWriter(&m.tpduGenerator, m.messageCodec, m.tm)) } func (m *Connection) String() string { diff --git a/plc4go/internal/plc4go/s7/Driver.go b/plc4go/internal/plc4go/s7/Driver.go index 7bbdc19c..9d99762 100644 --- a/plc4go/internal/plc4go/s7/Driver.go +++ b/plc4go/internal/plc4go/s7/Driver.go @@ -114,7 +114,7 @@ func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]trans // Create the new connection connection := NewConnection(codec, configuration, driverContext, m.fieldHandler, &m.tm) - log.Info().Stringer("connection", &connection).Msg("created connection, connecting now") + log.Info().Msg("created connection, connecting now") return connection.Connect() } diff --git a/plc4go/internal/plc4go/spi/default/DefaultConnection.go b/plc4go/internal/plc4go/spi/default/DefaultConnection.go new file mode 100644 index 0000000..03a809e --- /dev/null +++ b/plc4go/internal/plc4go/spi/default/DefaultConnection.go @@ -0,0 +1,267 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package _default + +import ( + "github.com/apache/plc4x/plc4go/internal/plc4go/spi" + "github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports" + "github.com/apache/plc4x/plc4go/pkg/plc4go" + "github.com/apache/plc4x/plc4go/pkg/plc4go/model" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + "time" +) + +// DefaultConnectionRequirements defines the required at a implementing connection when using DefaultConnection +// additional options can be set using the functions returning WithOption (e.g. WithDefaultTtl, WithPlcFieldHandler...) +type DefaultConnectionRequirements interface { + // GetConnection should return the implementing connection when using DefaultConnection + GetConnection() plc4go.PlcConnection + // GetMessageCodec should return the spi.MessageCodec in use + GetMessageCodec() spi.MessageCodec +} + +// DefaultConnection should be used as an embedded struct. All defined methods here have default implementations +type DefaultConnection interface { + plc4go.PlcConnection + spi.TransportInstanceExposer + spi.HandlerExposer + SetConnected(connected bool) + GetTtl() time.Duration +} + +// NewDefaultConnection is the factory for a DefaultConnection +func NewDefaultConnection(requirements DefaultConnectionRequirements, options ...WithOption) DefaultConnection { + return buildDefaultConnection(requirements, options...) +} + +// WithOption is a marker interface for options supplied by the builders like WithDefaultTtl +type WithOption interface { + isOption() bool +} + +// WithDefaultTtl ttl is time.Second * 10 by default +func WithDefaultTtl(defaultTtl time.Duration) WithOption { + return withDefaultTtl{defaultTtl: defaultTtl} +} + +func WithPlcFieldHandler(plcFieldHandler spi.PlcFieldHandler) WithOption { + return withPlcFieldHandler{plcFieldHandler: plcFieldHandler} +} + +func WithPlcValueHandler(plcValueHandler spi.PlcValueHandler) WithOption { + return withPlcValueHandler{plcValueHandler: plcValueHandler} +} + +// DefaultConnectionMetadata implements the model.PlcConnectionMetadata interface +type DefaultConnectionMetadata struct { + ConnectionAttributes map[string]string + ProvidesReading bool + ProvidesWriting bool + ProvidesSubscribing bool + ProvidesBrowsing bool +} + +// Internal + +type option struct { +} + +func (_ option) isOption() bool { + return true +} + +type withDefaultTtl struct { + option + // defaultTtl the time to live after a close + defaultTtl time.Duration +} + +type withPlcFieldHandler struct { + option + plcFieldHandler spi.PlcFieldHandler +} + +type withPlcValueHandler struct { + option + plcValueHandler spi.PlcValueHandler +} + +type defaultConnection struct { + DefaultConnectionRequirements + // defaultTtl the time to live after a close + defaultTtl time.Duration + // connected indicates if a connection is connected + connected bool + fieldHandler spi.PlcFieldHandler + valueHandler spi.PlcValueHandler +} + +func buildDefaultConnection(requirements DefaultConnectionRequirements, options ...WithOption) DefaultConnection { + defaultTtl := time.Second * 10 + var fieldHandler spi.PlcFieldHandler + var valueHandler spi.PlcValueHandler + + for _, option := range options { + if !option.isOption() { + panic("not a option") + } + switch option.(type) { + case withDefaultTtl: + defaultTtl = option.(withDefaultTtl).defaultTtl + log.Debug() + case withPlcFieldHandler: + fieldHandler = option.(withPlcFieldHandler).plcFieldHandler + case withPlcValueHandler: + valueHandler = option.(withPlcValueHandler).plcValueHandler + } + } + + return &defaultConnection{ + requirements, + defaultTtl, + false, + fieldHandler, + valueHandler, + } +} + +func (d *defaultConnection) SetConnected(connected bool) { + d.connected = connected +} + +func (d *defaultConnection) Connect() <-chan plc4go.PlcConnectionConnectResult { + log.Trace().Msg("Connecting") + ch := make(chan plc4go.PlcConnectionConnectResult) + go func() { + err := d.GetMessageCodec().Connect() + d.SetConnected(true) + connection := d.GetConnection() + ch <- plc4go.NewPlcConnectionConnectResult(connection, err) + }() + return ch +} + +func (d *defaultConnection) BlockingClose() { + log.Trace().Msg("blocking close connection") + closeResults := d.GetConnection().Close() + d.SetConnected(false) + select { + case <-closeResults: + return + case <-time.After(d.GetTtl()): + return + } +} + +func (d *defaultConnection) Close() <-chan plc4go.PlcConnectionCloseResult { + log.Trace().Msg("close connection") + d.SetConnected(false) + ch := make(chan plc4go.PlcConnectionCloseResult) + go func() { + ch <- plc4go.NewPlcConnectionCloseResult(d.GetConnection(), nil) + }() + return ch +} + +func (d *defaultConnection) IsConnected() bool { + return d.connected +} + +func (d *defaultConnection) Ping() <-chan plc4go.PlcConnectionPingResult { + ch := make(chan plc4go.PlcConnectionPingResult) + go func() { + if d.GetConnection().IsConnected() { + ch <- plc4go.NewPlcConnectionPingResult(nil) + } else { + ch <- plc4go.NewPlcConnectionPingResult(errors.New("not connected")) + } + }() + return ch +} + +func (d *defaultConnection) GetTtl() time.Duration { + return d.defaultTtl +} + +func (d *defaultConnection) GetMetadata() model.PlcConnectionMetadata { + return DefaultConnectionMetadata{ + ConnectionAttributes: nil, + ProvidesReading: false, + ProvidesWriting: false, + ProvidesSubscribing: false, + ProvidesBrowsing: false, + } +} + +func (d *defaultConnection) ReadRequestBuilder() model.PlcReadRequestBuilder { + panic("not implemented") +} + +func (d *defaultConnection) WriteRequestBuilder() model.PlcWriteRequestBuilder { + panic("not implemented") +} + +func (d *defaultConnection) SubscriptionRequestBuilder() model.PlcSubscriptionRequestBuilder { + panic("not implemented") +} + +func (d *defaultConnection) UnsubscriptionRequestBuilder() model.PlcUnsubscriptionRequestBuilder { + panic("not implemented") +} + +func (d *defaultConnection) BrowseRequestBuilder() model.PlcBrowseRequestBuilder { + panic("not implemented") +} + +func (d *defaultConnection) GetTransportInstance() transports.TransportInstance { + if mc, ok := d.GetMessageCodec().(spi.TransportInstanceExposer); ok { + return mc.GetTransportInstance() + } + return nil +} + +func (d *defaultConnection) GetPlcFieldHandler() spi.PlcFieldHandler { + return d.fieldHandler +} + +func (d *defaultConnection) GetPlcValueHandler() spi.PlcValueHandler { + return d.valueHandler +} + +func (m DefaultConnectionMetadata) GetConnectionAttributes() map[string]string { + return m.ConnectionAttributes +} + +func (m DefaultConnectionMetadata) CanRead() bool { + return m.ProvidesReading +} + +func (m DefaultConnectionMetadata) CanWrite() bool { + return m.ProvidesWriting +} + +func (m DefaultConnectionMetadata) CanSubscribe() bool { + return m.ProvidesSubscribing +} + +func (m DefaultConnectionMetadata) CanBrowse() bool { + return m.ProvidesBrowsing +} diff --git a/plc4go/internal/plc4go/spi/default/init.go b/plc4go/internal/plc4go/spi/default/init.go new file mode 100644 index 0000000..adba704 --- /dev/null +++ b/plc4go/internal/plc4go/spi/default/init.go @@ -0,0 +1,21 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +// Package _default contains default implementations for interfaces defined within the spi +package _default
