This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new e8462caece feat(plc4go): streamline connect calls
e8462caece is described below
commit e8462caecea21e97a6c56498d312ac9582707b47
Author: Sebastian Rühl <[email protected]>
AuthorDate: Wed Apr 5 16:07:45 2023 +0200
feat(plc4go): streamline connect calls
+ Added new ConnectWithContext method
---
plc4go/internal/ads/Connection.go | 5 +----
plc4go/internal/ads/Driver.go | 14 +++++--------
plc4go/internal/bacnetip/Connection.go | 5 +++--
plc4go/internal/bacnetip/Driver.go | 15 ++++++--------
plc4go/internal/cbus/Connection.go | 4 +---
plc4go/internal/cbus/Driver.go | 13 +++++-------
plc4go/internal/eip/Connection.go | 4 +---
plc4go/internal/eip/EipDriver.go | 10 ++++++----
plc4go/internal/knxnetip/Connection.go | 6 ++++--
plc4go/internal/knxnetip/Driver.go | 14 +++++--------
plc4go/internal/modbus/ModbusAsciiDriver.go | 11 +++++-----
plc4go/internal/modbus/ModbusRtuDriver.go | 11 +++++-----
plc4go/internal/modbus/ModbusTcpDriver.go | 11 +++++-----
plc4go/internal/s7/Connection.go | 4 +---
plc4go/internal/s7/Driver.go | 10 ++++++----
plc4go/internal/simulated/Connection.go | 5 +++++
plc4go/internal/simulated/Driver.go | 20 +++++++------------
plc4go/internal/simulated/Driver_test.go | 4 +---
plc4go/pkg/api/cache/plc_connection_lease.go | 5 +++++
plc4go/pkg/api/connection.go | 3 +++
plc4go/pkg/api/driver.go | 3 +++
plc4go/spi/default/DefaultConnection.go | 9 ++++++++-
plc4go/spi/default/DefaultDriver.go | 30 ++++++++++++++++++----------
23 files changed, 113 insertions(+), 103 deletions(-)
diff --git a/plc4go/internal/ads/Connection.go
b/plc4go/internal/ads/Connection.go
index a1882eec0e..121b0ceab3 100644
--- a/plc4go/internal/ads/Connection.go
+++ b/plc4go/internal/ads/Connection.go
@@ -95,10 +95,7 @@ func (m *Connection) GetConnection() plc4go.PlcConnection {
return m
}
-func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
- // TODO: use proper context
- ctx := context.TODO()
-
+func (m *Connection) ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult {
log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
diff --git a/plc4go/internal/ads/Driver.go b/plc4go/internal/ads/Driver.go
index ac657b7345..ba336f461e 100644
--- a/plc4go/internal/ads/Driver.go
+++ b/plc4go/internal/ads/Driver.go
@@ -40,12 +40,12 @@ type Driver struct {
}
func NewDriver() plc4go.PlcDriver {
- return &Driver{
- DefaultDriver: _default.NewDefaultDriver("ads", "Beckhoff
TwinCat ADS", "tcp", NewTagHandler()),
- }
+ driver := &Driver{}
+ driver.DefaultDriver = _default.NewDefaultDriver(driver, "ads",
"Beckhoff TwinCat ADS", "tcp", NewTagHandler())
+ return driver
}
-func (m *Driver) GetConnection(transportUrl url.URL, transports
map[string]transports.Transport, options map[string][]string) <-chan
plc4go.PlcConnectionConnectResult {
+func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl
url.URL, transports map[string]transports.Transport, options
map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get
connection for transport url with %d transport(s) and %d option(s)",
len(transports), len(options))
// Get the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
@@ -90,17 +90,13 @@ func (m *Driver) GetConnection(transportUrl url.URL,
transports map[string]trans
return ch
}
log.Debug().Stringer("connection", connection).Msg("created connection,
connecting now")
- return connection.Connect()
+ return connection.ConnectWithContext(ctx)
}
func (m *Driver) SupportsDiscovery() bool {
return true
}
-func (m *Driver) Discover(callback func(event apiModel.PlcDiscoveryItem),
discoveryOptions ...options.WithDiscoveryOption) error {
- return m.DiscoverWithContext(context.TODO(), callback,
discoveryOptions...)
-}
-
func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event
apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption)
error {
return NewDiscoverer().Discover(ctx, callback, discoveryOptions...)
}
diff --git a/plc4go/internal/bacnetip/Connection.go
b/plc4go/internal/bacnetip/Connection.go
index 841b78d437..30113f1a14 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -20,6 +20,7 @@
package bacnetip
import (
+ "context"
"fmt"
"sync"
"time"
@@ -74,11 +75,11 @@ func (c *Connection) GetTracer() *spi.Tracer {
return c.tracer
}
-func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
+func (c *Connection) ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult {
log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
- connectionConnectResult := <-c.DefaultConnection.Connect()
+ connectionConnectResult :=
<-c.DefaultConnection.ConnectWithContext(ctx)
go func() {
for c.IsConnected() {
log.Trace().Msg("Polling data")
diff --git a/plc4go/internal/bacnetip/Driver.go
b/plc4go/internal/bacnetip/Driver.go
index 0fef164050..2803a73c58 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -49,8 +49,7 @@ type Driver struct {
}
func NewDriver() plc4go.PlcDriver {
- return &Driver{
- DefaultDriver: _default.NewDefaultDriver("bacnet-ip",
"BACnet/IP", "udp", NewTagHandler()),
+ driver := &Driver{
applicationManager: ApplicationManager{
applications:
map[string]*ApplicationLayerMessageCodec{},
},
@@ -58,11 +57,13 @@ func NewDriver() plc4go.PlcDriver {
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
+ driver.DefaultDriver = _default.NewDefaultDriver(driver, "bacnet-ip",
"BACnet/IP", "udp", NewTagHandler())
+ return driver
}
-func (m *Driver) GetConnection(transportUrl url.URL, transports
map[string]transports.Transport, options map[string][]string) <-chan
plc4go.PlcConnectionConnectResult {
+func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl
url.URL, transports map[string]transports.Transport, options
map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get
connection for transport url with %d transport(s) and %d option(s)",
len(transports), len(options))
- // Get an the transport specified in the url
+ // Get the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
log.Error().Stringer("transportUrl", &transportUrl).Msgf("We
couldn't find a transport for scheme %s", transportUrl.Scheme)
@@ -104,17 +105,13 @@ func (m *Driver) GetConnection(transportUrl url.URL,
transports map[string]trans
// Create the new connection
connection := NewConnection(codec, m.GetPlcTagHandler(), &m.tm, options)
log.Debug().Msg("created connection, connecting now")
- return connection.Connect()
+ return connection.ConnectWithContext(ctx)
}
func (m *Driver) SupportsDiscovery() bool {
return true
}
-func (m *Driver) Discover(callback func(event apiModel.PlcDiscoveryItem),
discoveryOptions ...options.WithDiscoveryOption) error {
- return m.DiscoverWithContext(context.TODO(), callback,
discoveryOptions...)
-}
-
func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event
apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption)
error {
return NewDiscoverer().Discover(ctx, callback, discoveryOptions...)
}
diff --git a/plc4go/internal/cbus/Connection.go
b/plc4go/internal/cbus/Connection.go
index 4afcd19648..91062f1baa 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -107,9 +107,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
return c.messageCodec
}
-func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
- // TODO: use proper context
- ctx := context.TODO()
+func (c *Connection) ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult {
log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
diff --git a/plc4go/internal/cbus/Driver.go b/plc4go/internal/cbus/Driver.go
index c06de0fb9f..9dd7302765 100644
--- a/plc4go/internal/cbus/Driver.go
+++ b/plc4go/internal/cbus/Driver.go
@@ -43,15 +43,16 @@ type Driver struct {
}
func NewDriver() plc4go.PlcDriver {
- return &Driver{
- DefaultDriver: _default.NewDefaultDriver("c-bus",
"Clipsal Bus", "tcp", NewTagHandler()),
+ driver := &Driver{
tm: *spi.NewRequestTransactionManager(1),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
+ driver.DefaultDriver = _default.NewDefaultDriver(driver, "c-bus",
"Clipsal Bus", "tcp", NewTagHandler())
+ return driver
}
-func (m *Driver) GetConnection(transportUrl url.URL, transports
map[string]transports.Transport, options map[string][]string) <-chan
plc4go.PlcConnectionConnectResult {
+func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl
url.URL, transports map[string]transports.Transport, options
map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get
connection for transport url with %d transport(s) and %d option(s)",
len(transports), len(options))
// Get the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
@@ -104,7 +105,7 @@ func (m *Driver) GetConnection(transportUrl url.URL,
transports map[string]trans
// Create the new connection
connection := NewConnection(codec, configuration, driverContext,
m.GetPlcTagHandler(), &m.tm, options)
log.Debug().Msg("created connection, connecting now")
- return connection.Connect()
+ return connection.ConnectWithContext(ctx)
}
func (m *Driver) SetAwaitSetupComplete(awaitComplete bool) {
@@ -119,10 +120,6 @@ func (m *Driver) SupportsDiscovery() bool {
return true
}
-func (m *Driver) Discover(callback func(event apiModel.PlcDiscoveryItem),
discoveryOptions ...options.WithDiscoveryOption) error {
- return m.DiscoverWithContext(context.TODO(), callback,
discoveryOptions...)
-}
-
func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event
apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption)
error {
return NewDiscoverer().Discover(ctx, callback, discoveryOptions...)
}
diff --git a/plc4go/internal/eip/Connection.go
b/plc4go/internal/eip/Connection.go
index 4ef90872eb..b3c7ccd7ce 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -102,9 +102,7 @@ func (m *Connection) GetMessageCodec() spi.MessageCodec {
return m.messageCodec
}
-func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
- // TODO: use proper context
- ctx := context.TODO()
+func (m *Connection) ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult {
log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
diff --git a/plc4go/internal/eip/EipDriver.go b/plc4go/internal/eip/EipDriver.go
index 44d7d3deca..0483d3b25e 100644
--- a/plc4go/internal/eip/EipDriver.go
+++ b/plc4go/internal/eip/EipDriver.go
@@ -20,6 +20,7 @@
package eip
import (
+ "context"
"net/url"
"github.com/apache/plc4x/plc4go/pkg/api"
@@ -38,15 +39,16 @@ type Driver struct {
}
func NewDriver() plc4go.PlcDriver {
- return &Driver{
- DefaultDriver: _default.NewDefaultDriver("eip",
"EthernetIP", "tcp", NewTagHandler()),
+ driver := &Driver{
tm: *spi.NewRequestTransactionManager(1),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
+ driver.DefaultDriver = _default.NewDefaultDriver(driver, "eip",
"EthernetIP", "tcp", NewTagHandler())
+ return driver
}
-func (m *Driver) GetConnection(transportUrl url.URL, transports
map[string]transports.Transport, options map[string][]string) <-chan
plc4go.PlcConnectionConnectResult {
+func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl
url.URL, transports map[string]transports.Transport, options
map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get
connection for transport url with %d transport(s) and %d option(s)",
len(transports), len(options))
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
@@ -99,7 +101,7 @@ func (m *Driver) GetConnection(transportUrl url.URL,
transports map[string]trans
// Create the new connection
connection := NewConnection(codec, configuration, driverContext,
m.GetPlcTagHandler(), &m.tm, options)
log.Debug().Msg("created connection, connecting now")
- return connection.Connect()
+ return connection.ConnectWithContext(ctx)
}
func (m *Driver) SetAwaitSetupComplete(awaitComplete bool) {
diff --git a/plc4go/internal/knxnetip/Connection.go
b/plc4go/internal/knxnetip/Connection.go
index 9efc43ea4f..4d6ddcb7e1 100644
--- a/plc4go/internal/knxnetip/Connection.go
+++ b/plc4go/internal/knxnetip/Connection.go
@@ -216,8 +216,10 @@ func (m *Connection) GetTracer() *spi.Tracer {
}
func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
- // TODO: use proper context
- ctx := context.TODO()
+ return m.ConnectWithContext(context.Background())
+}
+
+func (m *Connection) ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult {
result := make(chan plc4go.PlcConnectionConnectResult)
sendResult := func(connection plc4go.PlcConnection, err error) {
result <-
_default.NewDefaultPlcConnectionConnectResult(connection, err)
diff --git a/plc4go/internal/knxnetip/Driver.go
b/plc4go/internal/knxnetip/Driver.go
index b41181e5ca..cede735647 100644
--- a/plc4go/internal/knxnetip/Driver.go
+++ b/plc4go/internal/knxnetip/Driver.go
@@ -37,9 +37,9 @@ type Driver struct {
}
func NewDriver() *Driver {
- return &Driver{
- DefaultDriver: _default.NewDefaultDriver("knxnet-ip",
"KNXNet/IP", "udp", NewTagHandler()),
- }
+ driver := &Driver{}
+ driver.DefaultDriver = _default.NewDefaultDriver(driver, "knxnet-ip",
"KNXNet/IP", "udp", NewTagHandler())
+ return driver
}
func (m *Driver) CheckQuery(query string) error {
@@ -47,7 +47,7 @@ func (m *Driver) CheckQuery(query string) error {
return err
}
-func (m *Driver) GetConnection(transportUrl url.URL, transports
map[string]transports.Transport, options map[string][]string) <-chan
plc4go.PlcConnectionConnectResult {
+func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl
url.URL, transports map[string]transports.Transport, options
map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
@@ -72,17 +72,13 @@ func (m *Driver) GetConnection(transportUrl url.URL,
transports map[string]trans
// Create the new connection
connection := NewConnection(transportInstance, options,
m.GetPlcTagHandler())
log.Trace().Str("transport",
transportUrl.String()).Stringer("connection", connection).Msg("created new
connection instance, trying to connect now")
- return connection.Connect()
+ return connection.ConnectWithContext(ctx)
}
func (m *Driver) SupportsDiscovery() bool {
return true
}
-func (m *Driver) Discover(callback func(event apiModel.PlcDiscoveryItem),
discoveryOptions ...options.WithDiscoveryOption) error {
- return m.DiscoverWithContext(context.TODO(), callback,
discoveryOptions...)
-}
-
func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event
apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption)
error {
return NewDiscoverer().Discover(ctx, callback, discoveryOptions...)
}
diff --git a/plc4go/internal/modbus/ModbusAsciiDriver.go
b/plc4go/internal/modbus/ModbusAsciiDriver.go
index 4349936ff3..f7c8a697fa 100644
--- a/plc4go/internal/modbus/ModbusAsciiDriver.go
+++ b/plc4go/internal/modbus/ModbusAsciiDriver.go
@@ -20,6 +20,7 @@
package modbus
import (
+ "context"
"encoding/json"
"github.com/apache/plc4x/plc4go/pkg/api"
"github.com/apache/plc4x/plc4go/protocols/modbus/readwrite/model"
@@ -36,12 +37,12 @@ type ModbusAsciiDriver struct {
}
func NewModbusAsciiDriver() *ModbusAsciiDriver {
- return &ModbusAsciiDriver{
- DefaultDriver: _default.NewDefaultDriver("modbus-ascii",
"Modbus ASCII", "serial", NewTagHandler()),
- }
+ driver := &ModbusAsciiDriver{}
+ driver.DefaultDriver = _default.NewDefaultDriver(driver,
"modbus-ascii", "Modbus ASCII", "serial", NewTagHandler())
+ return driver
}
-func (m ModbusAsciiDriver) GetConnection(transportUrl url.URL, transports
map[string]transports.Transport, options map[string][]string) <-chan
plc4go.PlcConnectionConnectResult {
+func (m ModbusAsciiDriver) GetConnectionWithContext(ctx context.Context,
transportUrl url.URL, transports map[string]transports.Transport, options
map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get
connection for transport url with %d transport(s) and %d option(s)",
len(transports), len(options))
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
@@ -98,5 +99,5 @@ func (m ModbusAsciiDriver) GetConnection(transportUrl
url.URL, transports map[st
// Create the new connection
connection := NewConnection(unitIdentifier, codec, options,
m.GetPlcTagHandler())
log.Debug().Stringer("connection", connection).Msg("created connection,
connecting now")
- return connection.Connect()
+ return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/modbus/ModbusRtuDriver.go
b/plc4go/internal/modbus/ModbusRtuDriver.go
index 83d5d10d9b..6e306c4239 100644
--- a/plc4go/internal/modbus/ModbusRtuDriver.go
+++ b/plc4go/internal/modbus/ModbusRtuDriver.go
@@ -20,6 +20,7 @@
package modbus
import (
+ "context"
"encoding/json"
"github.com/apache/plc4x/plc4go/pkg/api"
"github.com/apache/plc4x/plc4go/protocols/modbus/readwrite/model"
@@ -36,12 +37,12 @@ type ModbusRtuDriver struct {
}
func NewModbusRtuDriver() *ModbusRtuDriver {
- return &ModbusRtuDriver{
- DefaultDriver: _default.NewDefaultDriver("modbus-rtu", "Modbus
RTU", "serial", NewTagHandler()),
- }
+ driver := &ModbusRtuDriver{}
+ driver.DefaultDriver = _default.NewDefaultDriver(driver, "modbus-rtu",
"Modbus RTU", "serial", NewTagHandler())
+ return driver
}
-func (m ModbusRtuDriver) GetConnection(transportUrl url.URL, transports
map[string]transports.Transport, options map[string][]string) <-chan
plc4go.PlcConnectionConnectResult {
+func (m ModbusRtuDriver) GetConnectionWithContext(ctx context.Context,
transportUrl url.URL, transports map[string]transports.Transport, options
map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get
connection for transport url with %d transport(s) and %d option(s)",
len(transports), len(options))
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
@@ -98,5 +99,5 @@ func (m ModbusRtuDriver) GetConnection(transportUrl url.URL,
transports map[stri
// Create the new connection
connection := NewConnection(unitIdentifier, codec, options,
m.GetPlcTagHandler())
log.Debug().Stringer("connection", connection).Msg("created connection,
connecting now")
- return connection.Connect()
+ return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/modbus/ModbusTcpDriver.go
b/plc4go/internal/modbus/ModbusTcpDriver.go
index 6a82971b2b..d2f7e58a35 100644
--- a/plc4go/internal/modbus/ModbusTcpDriver.go
+++ b/plc4go/internal/modbus/ModbusTcpDriver.go
@@ -20,6 +20,7 @@
package modbus
import (
+ "context"
"encoding/json"
"github.com/apache/plc4x/plc4go/pkg/api"
"github.com/apache/plc4x/plc4go/protocols/modbus/readwrite/model"
@@ -36,12 +37,12 @@ type ModbusTcpDriver struct {
}
func NewModbusTcpDriver() *ModbusTcpDriver {
- return &ModbusTcpDriver{
- DefaultDriver: _default.NewDefaultDriver("modbus-tcp", "Modbus
TCP", "tcp", NewTagHandler()),
- }
+ driver := &ModbusTcpDriver{}
+ driver.DefaultDriver = _default.NewDefaultDriver(driver, "modbus-tcp",
"Modbus TCP", "tcp", NewTagHandler())
+ return driver
}
-func (m ModbusTcpDriver) GetConnection(transportUrl url.URL, transports
map[string]transports.Transport, options map[string][]string) <-chan
plc4go.PlcConnectionConnectResult {
+func (m ModbusTcpDriver) GetConnectionWithContext(ctx context.Context,
transportUrl url.URL, transports map[string]transports.Transport, options
map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get
connection for transport url with %d transport(s) and %d option(s)",
len(transports), len(options))
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
@@ -98,5 +99,5 @@ func (m ModbusTcpDriver) GetConnection(transportUrl url.URL,
transports map[stri
// Create the new connection
connection := NewConnection(unitIdentifier, codec, options,
m.GetPlcTagHandler())
log.Debug().Stringer("connection", connection).Msg("created connection,
connecting now")
- return connection.Connect()
+ return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go
index 8db697b784..97e00e90a8 100644
--- a/plc4go/internal/s7/Connection.go
+++ b/plc4go/internal/s7/Connection.go
@@ -106,9 +106,7 @@ func (m *Connection) GetMessageCodec() spi.MessageCodec {
return m.messageCodec
}
-func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
- // TODO: use proper context
- ctx := context.TODO()
+func (m *Connection) ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult {
log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
diff --git a/plc4go/internal/s7/Driver.go b/plc4go/internal/s7/Driver.go
index e4049bad3b..edb3801100 100644
--- a/plc4go/internal/s7/Driver.go
+++ b/plc4go/internal/s7/Driver.go
@@ -20,6 +20,7 @@
package s7
import (
+ "context"
"net/url"
"github.com/apache/plc4x/plc4go/pkg/api"
@@ -38,15 +39,16 @@ type Driver struct {
}
func NewDriver() plc4go.PlcDriver {
- return &Driver{
- DefaultDriver: _default.NewDefaultDriver("s7",
"Siemens S7 (Basic)", "tcp", NewTagHandler()),
+ driver := &Driver{
tm: *spi.NewRequestTransactionManager(1),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
+ driver.DefaultDriver = _default.NewDefaultDriver(driver, "s7", "Siemens
S7 (Basic)", "tcp", NewTagHandler())
+ return driver
}
-func (m *Driver) GetConnection(transportUrl url.URL, transports
map[string]transports.Transport, options map[string][]string) <-chan
plc4go.PlcConnectionConnectResult {
+func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl
url.URL, transports map[string]transports.Transport, options
map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get
connection for transport url with %d transport(s) and %d option(s)",
len(transports), len(options))
// Get the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
@@ -99,7 +101,7 @@ func (m *Driver) GetConnection(transportUrl url.URL,
transports map[string]trans
// Create the new connection
connection := NewConnection(codec, configuration, driverContext,
m.GetPlcTagHandler(), &m.tm, options)
log.Debug().Msg("created connection, connecting now")
- return connection.Connect()
+ return connection.ConnectWithContext(ctx)
}
func (m *Driver) SetAwaitSetupComplete(awaitComplete bool) {
diff --git a/plc4go/internal/simulated/Connection.go
b/plc4go/internal/simulated/Connection.go
index 89a80545cd..2f364f3326 100644
--- a/plc4go/internal/simulated/Connection.go
+++ b/plc4go/internal/simulated/Connection.go
@@ -20,6 +20,7 @@
package simulated
import (
+ "context"
"strconv"
"time"
@@ -72,6 +73,10 @@ func (c *Connection) GetTracer() *spi.Tracer {
}
func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
+ return c.ConnectWithContext(context.Background())
+}
+
+func (c *Connection) ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult {
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
// Check if the connection was already connected
diff --git a/plc4go/internal/simulated/Driver.go
b/plc4go/internal/simulated/Driver.go
index 5a02e9c340..117c53865d 100644
--- a/plc4go/internal/simulated/Driver.go
+++ b/plc4go/internal/simulated/Driver.go
@@ -20,14 +20,12 @@
package simulated
import (
+ "context"
"net/url"
"github.com/apache/plc4x/plc4go/pkg/api"
- "github.com/apache/plc4x/plc4go/pkg/api/model"
_default "github.com/apache/plc4x/plc4go/spi/default"
- "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
- "github.com/pkg/errors"
)
type Driver struct {
@@ -36,16 +34,16 @@ type Driver struct {
}
func NewDriver() plc4go.PlcDriver {
- return &Driver{
- DefaultDriver: _default.NewDefaultDriver("simulated",
"Simulated PLC4X Datasource", "none", NewTagHandler()),
- valueHandler: NewValueHandler(),
+ driver := &Driver{
+ valueHandler: NewValueHandler(),
}
+ driver.DefaultDriver = _default.NewDefaultDriver(driver, "simulated",
"Simulated PLC4X Datasource", "none", NewTagHandler())
+ return driver
}
-// GetConnection Establishes a connection to a given PLC using the information
in the connectionString
-func (d *Driver) GetConnection(_ url.URL, _ map[string]transports.Transport,
options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+func (d *Driver) GetConnectionWithContext(ctx context.Context, _ url.URL, _
map[string]transports.Transport, options map[string][]string) <-chan
plc4go.PlcConnectionConnectResult {
connection := NewConnection(NewDevice("test"), d.GetPlcTagHandler(),
d.valueHandler, options)
- return connection.Connect()
+ return connection.ConnectWithContext(ctx)
}
// SupportsDiscovery returns true if this driver supports discovery
@@ -53,7 +51,3 @@ func (d *Driver) GetConnection(_ url.URL, _
map[string]transports.Transport, opt
func (d *Driver) SupportsDiscovery() bool {
return false
}
-
-func (d *Driver) Discover(_ func(event model.PlcDiscoveryItem), _
...options.WithDiscoveryOption) error {
- return errors.New("unsupported operation")
-}
diff --git a/plc4go/internal/simulated/Driver_test.go
b/plc4go/internal/simulated/Driver_test.go
index c2590acd35..c7f3f5c35e 100644
--- a/plc4go/internal/simulated/Driver_test.go
+++ b/plc4go/internal/simulated/Driver_test.go
@@ -86,9 +86,7 @@ func TestDriver_Discover(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- d := &Driver{
- valueHandler: tt.fields.valueHandler,
- }
+ d := NewDriver()
if err := d.Discover(tt.args.callback,
tt.args.discoveryOptions...); (err != nil) != tt.wantErr {
t.Errorf("Discover() error = %v, wantErr %v",
err, tt.wantErr)
}
diff --git a/plc4go/pkg/api/cache/plc_connection_lease.go
b/plc4go/pkg/api/cache/plc_connection_lease.go
index 12e85c3bfa..6389f4543f 100644
--- a/plc4go/pkg/api/cache/plc_connection_lease.go
+++ b/plc4go/pkg/api/cache/plc_connection_lease.go
@@ -20,6 +20,7 @@
package cache
import (
+ "context"
"fmt"
plc4go "github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -75,6 +76,10 @@ func (t *plcConnectionLease) Connect() <-chan
plc4go.PlcConnectionConnectResult
panic("Called 'Connect' on a cached connection")
}
+func (t *plcConnectionLease) ConnectWithContext(_ context.Context) <-chan
plc4go.PlcConnectionConnectResult {
+ panic("Called 'Connect' on a cached connection")
+}
+
func (t *plcConnectionLease) BlockingClose() {
if t.connection == nil {
panic("Called 'BlockingClose' on a closed cached connection")
diff --git a/plc4go/pkg/api/connection.go b/plc4go/pkg/api/connection.go
index 40c0eeed0f..c6f61e8e62 100644
--- a/plc4go/pkg/api/connection.go
+++ b/plc4go/pkg/api/connection.go
@@ -20,6 +20,7 @@
package plc4go
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
)
@@ -40,6 +41,8 @@ type PlcConnectionPingResult interface {
type PlcConnection interface {
// Connect Initiate the connection to the PLC
Connect() <-chan PlcConnectionConnectResult
+ // ConnectWithContext connects connection codec with the supplied
context
+ ConnectWithContext(ctx context.Context) <-chan
PlcConnectionConnectResult
// BlockingClose Blocking variant of Close (for usage in "defer"
statements)
BlockingClose()
// Close the connection to the PLC (gracefully)
diff --git a/plc4go/pkg/api/driver.go b/plc4go/pkg/api/driver.go
index d543637847..293ddad294 100644
--- a/plc4go/pkg/api/driver.go
+++ b/plc4go/pkg/api/driver.go
@@ -46,6 +46,9 @@ type PlcDriver interface {
// GetConnection Establishes a connection to a given PLC using the
information in the connectionString
// FIXME: this leaks spi in the signature move to spi driver or create
interfaces. Can also be done by moving spi in a proper module
GetConnection(transportUrl url.URL, transports
map[string]transports.Transport, options map[string][]string) <-chan
PlcConnectionConnectResult
+ // GetConnectionWithContext Establishes a connection to a given PLC
using the information in the connectionString
+ // FIXME: this leaks spi in the signature move to spi driver or create
interfaces. Can also be done by moving spi in a proper module
+ GetConnectionWithContext(ctx context.Context, transportUrl url.URL,
transports map[string]transports.Transport, options map[string][]string) <-chan
PlcConnectionConnectResult
// SupportsDiscovery returns true if this driver supports discovery
SupportsDiscovery() bool
diff --git a/plc4go/spi/default/DefaultConnection.go
b/plc4go/spi/default/DefaultConnection.go
index 3dfc6f5ad3..1ce885e113 100644
--- a/plc4go/spi/default/DefaultConnection.go
+++ b/plc4go/spi/default/DefaultConnection.go
@@ -20,6 +20,7 @@
package _default
import (
+ "context"
"time"
"github.com/apache/plc4x/plc4go/spi/options"
@@ -39,6 +40,8 @@ type DefaultConnectionRequirements interface {
GetConnection() plc4go.PlcConnection
// GetMessageCodec should return the spi.MessageCodec in use
GetMessageCodec() spi.MessageCodec
+ // ConnectWithContext is declared here for Connect redirection
+ ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult
}
// DefaultConnection should be used as an embedded struct. All defined methods
here have default implementations
@@ -226,10 +229,14 @@ func (d *defaultConnection) SetConnected(connected bool) {
}
func (d *defaultConnection) Connect() <-chan plc4go.PlcConnectionConnectResult
{
+ return
d.DefaultConnectionRequirements.ConnectWithContext(context.Background())
+}
+
+func (d *defaultConnection) ConnectWithContext(ctx context.Context) <-chan
plc4go.PlcConnectionConnectResult {
log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
- err := d.GetMessageCodec().Connect()
+ err := d.GetMessageCodec().ConnectWithContext(ctx)
d.SetConnected(true)
connection := d.GetConnection()
ch <- NewDefaultPlcConnectionConnectResult(connection, err)
diff --git a/plc4go/spi/default/DefaultDriver.go
b/plc4go/spi/default/DefaultDriver.go
index 33ad563a16..352eab5a84 100644
--- a/plc4go/spi/default/DefaultDriver.go
+++ b/plc4go/spi/default/DefaultDriver.go
@@ -22,6 +22,7 @@ package _default
import (
"context"
"fmt"
+ "github.com/pkg/errors"
"net/url"
"github.com/apache/plc4x/plc4go/pkg/api"
@@ -31,6 +32,11 @@ import (
"github.com/apache/plc4x/plc4go/spi/transports"
)
+type DefaultDriverRequirements interface {
+ GetConnectionWithContext(ctx context.Context, transportUrl url.URL,
transports map[string]transports.Transport, options map[string][]string) <-chan
plc4go.PlcConnectionConnectResult
+ DiscoverWithContext(callback context.Context, event func(event
apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption)
error
+}
+
type DefaultDriver interface {
fmt.Stringer
plc4go.PlcDriver
@@ -38,12 +44,13 @@ type DefaultDriver interface {
GetPlcTagHandler() spi.PlcTagHandler
}
-func NewDefaultDriver(protocolCode string, protocolName string,
defaultTransport string, plcTagHandler spi.PlcTagHandler) DefaultDriver {
+func NewDefaultDriver(defaultDriverRequirements DefaultDriverRequirements,
protocolCode string, protocolName string, defaultTransport string,
plcTagHandler spi.PlcTagHandler) DefaultDriver {
return &defaultDriver{
- protocolCode: protocolCode,
- protocolName: protocolName,
- defaultTransport: defaultTransport,
- plcTagHandler: plcTagHandler,
+ DefaultDriverRequirements: defaultDriverRequirements,
+ protocolCode: protocolCode,
+ protocolName: protocolName,
+ defaultTransport: defaultTransport,
+ plcTagHandler: plcTagHandler,
}
}
@@ -54,6 +61,7 @@ func NewDefaultDriver(protocolCode string, protocolName
string, defaultTransport
//
type defaultDriver struct {
+ DefaultDriverRequirements
protocolCode string
protocolName string
defaultTransport string
@@ -88,20 +96,20 @@ func (d *defaultDriver) CheckQuery(query string) error {
return err
}
-func (d *defaultDriver) GetConnection(_ url.URL, _
map[string]transports.Transport, _ map[string][]string) <-chan
plc4go.PlcConnectionConnectResult {
- panic("implement me")
+func (d *defaultDriver) GetConnection(transportUrl url.URL, transports
map[string]transports.Transport, options map[string][]string) <-chan
plc4go.PlcConnectionConnectResult {
+ return d.GetConnectionWithContext(context.Background(), transportUrl,
transports, options)
}
func (d *defaultDriver) SupportsDiscovery() bool {
return false
}
-func (d *defaultDriver) Discover(_ func(event apiModel.PlcDiscoveryItem), _
...options.WithDiscoveryOption) error {
- panic("not available")
+func (d *defaultDriver) Discover(callback func(event
apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption)
error {
+ return d.DiscoverWithContext(context.Background(), callback,
discoveryOptions...)
}
-func (d *defaultDriver) DiscoverWithContext(_ context.Context, callback
func(event apiModel.PlcDiscoveryItem), discoveryOptions
...options.WithDiscoveryOption) error {
- panic("not available")
+func (d *defaultDriver) DiscoverWithContext(_ context.Context, _ func(event
apiModel.PlcDiscoveryItem), _ ...options.WithDiscoveryOption) error {
+ return errors.New("not available")
}
func (d *defaultDriver) GetPlcTagHandler() spi.PlcTagHandler {