This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new cabc35dd35 fix(plc4go/cbus): limit discoverer with semaphore
cabc35dd35 is described below
commit cabc35dd353132b8cecdf1438cd0c98f43d85957
Author: Sebastian Rühl <[email protected]>
AuthorDate: Tue Mar 21 11:52:04 2023 +0100
fix(plc4go/cbus): limit discoverer with semaphore
---
plc4go/internal/cbus/Discoverer.go | 22 +++++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)
diff --git a/plc4go/internal/cbus/Discoverer.go
b/plc4go/internal/cbus/Discoverer.go
index 2d3da0f9c1..786fa5930f 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -23,6 +23,7 @@ import (
"context"
"fmt"
"github.com/apache/plc4x/plc4go/spi/transports/tcp"
+ "golang.org/x/sync/semaphore"
"net"
"net/url"
"sync"
@@ -30,7 +31,6 @@ import (
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
readWriteModel
"github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
- "github.com/apache/plc4x/plc4go/spi"
internalModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
@@ -40,11 +40,13 @@ import (
)
type Discoverer struct {
- messageCodec spi.MessageCodec
+ maxConcurrency int64
}
func NewDiscoverer() *Discoverer {
- return &Discoverer{}
+ return &Discoverer{
+ maxConcurrency: 50,
+ }
}
func (d *Discoverer) Discover(ctx context.Context, callback func(event
apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption)
error {
@@ -74,6 +76,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback
func(event apiModel.
}
transportInstances := make(chan transports.TransportInstance)
+ sem := semaphore.NewWeighted(d.maxConcurrency)
wg := &sync.WaitGroup{}
// Iterate over all network devices of this system.
for _, netInterface := range interfaces {
@@ -105,7 +108,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback
func(event apiModel.
if ipv4Addr == nil || ipv4Addr.IsLoopback() {
continue
}
- addresses, err :=
utils.GetIPAddresses(context.TODO(), netInterface, false)
+ addresses, err := utils.GetIPAddresses(ctx,
netInterface, false)
if err != nil {
log.Warn().Err(err).Msgf("Can't get
addresses for %v", netInterface)
continue
@@ -117,7 +120,12 @@ func (d *Discoverer) Discover(ctx context.Context,
callback func(event apiModel.
log.Trace().Msgf("Handling
found ip %v", ip)
wg.Add(1)
go func(ip net.IP) {
- defer func() {
wg.Done() }()
+ if err :=
sem.Acquire(ctx, 1); err != nil {
+
log.Debug().Err(err).Msg("Error acquiring")
+ return
+ }
+ defer sem.Release(1)
+ defer wg.Done()
// Create a new
"connection" (Actually open a local udp socket and target outgoing packets to
that address)
var connectionUrl
url.URL
{
@@ -225,7 +233,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback
func(event apiModel.
}
var remoteUrl url.URL
{
- // TODO: we could check for the
exact reponse
+ // TODO: we could check for the
exact response
remoteUrlParse, err :=
url.Parse(fmt.Sprintf("tcp://%s", tcpTransportInstance.RemoteAddress))
if err != nil {
log.Error().Err(err).Msg("Error creating url")
@@ -233,7 +241,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback
func(event apiModel.
}
remoteUrl = *remoteUrlParse
}
- // TODO: manufaturer + type would be
good but this means two requests then
+ // TODO: manufacturer + type would be
good but this means two requests then
deviceName :=
identifyReplyCommand.GetManufacturerName()
discoveryEvent :=
&internalModel.DefaultPlcDiscoveryItem{
ProtocolCode: "c-bus",