This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 2872c716f8d6182b82d8a6f35ed62b98bbd36b19 Author: Sebastian Rühl <[email protected]> AuthorDate: Fri Nov 14 10:02:21 2025 +0100 feat(plc4go)!: simplify cache --- plc4go/pkg/api/cache/PlcConnectionCache.go | 107 ++++----- plc4go/pkg/api/cache/PlcConnectionCache_test.go | 279 +++++++---------------- plc4go/pkg/api/cache/common.go | 35 --- plc4go/pkg/api/cache/connectionContainer.go | 43 ++-- plc4go/pkg/api/cache/connectionContainer_test.go | 10 +- plc4go/pkg/api/cache/mocks_test.go | 133 +---------- plc4go/pkg/api/cache/plcConnectionLease.go | 9 +- plc4go/pkg/api/cache/plcConnectionLease_test.go | 28 +-- 8 files changed, 187 insertions(+), 457 deletions(-) diff --git a/plc4go/pkg/api/cache/PlcConnectionCache.go b/plc4go/pkg/api/cache/PlcConnectionCache.go index fb181aed70..353b482211 100644 --- a/plc4go/pkg/api/cache/PlcConnectionCache.go +++ b/plc4go/pkg/api/cache/PlcConnectionCache.go @@ -37,7 +37,7 @@ import ( type PlcConnectionCache interface { GetConnection(ctx context.Context, connectionString string) (plc4go.PlcConnection, error) - Close() <-chan PlcConnectionCacheCloseResult + Close() error } func NewPlcConnectionCache(driverManager plc4go.PlcDriverManager, withConnectionCacheOptions ...WithConnectionCacheOption) PlcConnectionCache { @@ -177,7 +177,7 @@ func (c *plcConnectionCache) GetConnection(ctx context.Context, connectionString if c.tracer != nil { txId = c.tracer.AddTransactionalStartTrace("get-connection", "lease") } - connChan, errChan := connection.lease() + connChan, errChan := connection.lease(ctx) maximumWaitTimeout := time.NewTimer(c.maxWaitTime) select { case <-ctx.Done(): // abort on context cancel @@ -216,71 +216,58 @@ func (c *plcConnectionCache) GetConnection(ctx context.Context, connectionString } } -func (c *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult { +func (c *plcConnectionCache) Close() error { c.log.Debug().Msg("Closing connection cache started.") - ch := make(chan PlcConnectionCacheCloseResult, 1) - - c.wg.Go(func() { - c.log.Trace().Msg("Acquire lock") - c.cacheLock.Lock() - defer c.cacheLock.Unlock() - c.log.Trace().Msg("lock acquired") + c.log.Trace().Msg("Acquire lock") + c.cacheLock.Lock() + defer c.cacheLock.Unlock() + c.log.Trace().Msg("lock acquired") - if len(c.connections) == 0 { - select { - case ch <- newDefaultPlcConnectionCacheCloseResult(c, nil): - default: - c.log.Trace().Msg("Channel full, dropping response") - } - c.log.Debug().Msg("Closing connection cache finished.") - return - } + if len(c.connections) == 0 { + c.log.Debug().Msg("Closing connection cache finished.") + return nil + } - for _, connectionContainer := range c.connections { - ccLog := c.log.With().Stringer("connectionContainer", connectionContainer).Logger() - ccLog.Trace().Msg("Closing connection") - // Mark the connection as being closed to not try to re-establish it. - connectionContainer.closed = true - // Try to get a lease as this way we kow we're not closing the connection - // while some go func is still using it. - c.wg.Go(func() { - ccLog.Trace().Msg("getting a lease") - connChan, errChan := connectionContainer.lease() - closeTimeout := time.NewTimer(c.maxWaitTime) - select { - // We're just getting the lease as this way we can be sure nobody else is using it. - // We also really don't care if it worked, or not ... it's just an attempt of being - // nice. - case _ = <-connChan: - ccLog.Debug().Msg("Gracefully closing connection ...") - // Give back the connection. - if connectionContainer.connection != nil { - ccLog.Trace().Msg("closing actual connection") - connectionContainer.connection.Close() - } - case err := <-errChan: - ccLog.Debug().Err(err).Msg("Error while trying to get lease on connection, ignoring.") - // If we're timing out brutally kill the connection. - case <-closeTimeout.C: - ccLog.Debug().Msg("Forcefully closing connection ...") - // Forcefully close this connection. - if connectionContainer.connection != nil { - connectionContainer.connection.Close() - } + for _, connectionContainer := range c.connections { + ccLog := c.log.With().Stringer("connectionContainer", connectionContainer).Logger() + ccLog.Trace().Msg("Closing connection") + // Mark the connection as being closed to not try to re-establish it. + connectionContainer.closed = true + // Try to get a lease as this way we kow we're not closing the connection + // while some go func is still using it. + ccLog.Trace().Msg("getting a lease") + ctx, cancel := context.WithTimeout(context.TODO(), c.maxWaitTime) + connChan, errChan := connectionContainer.lease(ctx) + select { + // We're just getting the lease as this way we can be sure nobody else is using it. + // We also really don't care if it worked, or not ... it's just an attempt of being + // nice. + case _ = <-connChan: + ccLog.Debug().Msg("Gracefully closing connection ...") + // Give back the connection. + if connectionContainer.connection != nil { + ccLog.Trace().Msg("closing actual connection") + if err := connectionContainer.connection.Close(); err != nil { + ccLog.Debug().Err(err).Msg("Error while closing connection") } - - c.log.Trace().Msg("Writing response") - select { - case ch <- newDefaultPlcConnectionCacheCloseResult(c, nil): - default: - c.log.Trace().Msg("Channel full, dropping response") + } + case err := <-errChan: + ccLog.Debug().Err(err).Msg("Error while trying to get lease on connection, ignoring.") + // If we're timing out brutally kill the connection. + case <-ctx.Done(): + ccLog.Debug().Msg("Forcefully closing connection ...") + // Forcefully close this connection. + if connectionContainer.connection != nil { + if err := connectionContainer.connection.Close(); err != nil { + ccLog.Debug().Err(err).Msg("Error while closing connection") } - c.log.Debug().Msg("Closing connection cache finished.") - }) + } } - }) + cancel() - return ch + c.log.Debug().Msg("Closing connection cache finished.") + } + return nil } func (c *plcConnectionCache) String() string { diff --git a/plc4go/pkg/api/cache/PlcConnectionCache_test.go b/plc4go/pkg/api/cache/PlcConnectionCache_test.go index a8b8d8bc2b..cd20cc0deb 100644 --- a/plc4go/pkg/api/cache/PlcConnectionCache_test.go +++ b/plc4go/pkg/api/cache/PlcConnectionCache_test.go @@ -20,6 +20,7 @@ package cache import ( + "strings" "testing" "time" @@ -35,7 +36,7 @@ import ( "github.com/apache/plc4x/plc4go/spi/tracer" ) -var debugTimeout = 1 +var debugTimeout = 100 func TestPlcConnectionCache_GetConnection(t *testing.T) { type fields struct { @@ -176,51 +177,48 @@ func TestPlcConnectionCache_Close(t *testing.T) { } } // Close all connections. - cacheCloseResults := cc.Close() - // Wait for all connections to be closed. - select { - case cacheCloseResult := <-cacheCloseResults: - if tt.wantErr && (cacheCloseResult.GetErr() == nil) { - t.Errorf("PlcConnectionCache.Close() = %v, wantErr %v", cacheCloseResult.GetErr(), tt.wantErr) - } else if cacheCloseResult.GetErr() != nil { - t.Errorf("PlcConnectionCache.Close() error = %v, wantErr %v", cacheCloseResult.GetErr(), tt.wantErr) - } - case <-time.After(10 * time.Second): - if !tt.wantTimeout { - t.Errorf("PlcConnectionCache.Close() got timeout") - } - } - + assert.NoError(t, cc.Close()) }) } } -func (c *plcConnectionCache) readFromPlc(t *testing.T, preConnectJob func(), connectionString string, resourceString string) <-chan []tracer.TraceEntry { - ch := make(chan []tracer.TraceEntry) +func readFromPlc(t *testing.T, c *plcConnectionCache, preConnectJob func(), connectionString string, resourceString string) <-chan []tracer.TraceEntry { + t.Helper() + t.Log("readFromPlc") + t.Log("Creating tracer channel") + tracerChan := make(chan []tracer.TraceEntry, 1) if preConnectJob != nil { + t.Log("Executing preConnectJob") preConnectJob() } + t.Log("Getting connection from cache") // Get a connection connection, err := c.GetConnection(t.Context(), connectionString) if err != nil { t.Errorf("PlcConnectionCache.GetConnection() error = %v", err) return nil } + t.Log("Got connection from cache") defer func() { + t.Log("Closing connection") if err := connection.Close(); err != nil { - c.log.Debug().Err(err).Msg("Error closing connection") + t.Log("Error closing connection", err) } + tracerChan <- connection.(interface{ GetLastTraces() []tracer.TraceEntry }).GetLastTraces() + t.Log("Closed connection") }() // Prepare a read request. + t.Log("Preparing read request") readRequest, err := connection.ReadRequestBuilder().AddTagAddress("test", resourceString).Build() if err != nil { t.Errorf("PlcConnectionCache.ReadRequest.Build() error = %v", err) - return ch + return tracerChan } // Execute the read request. + t.Log("Executing read request") execution := readRequest.Execute(t.Context()) select { case readRequestResult := <-execution: @@ -231,14 +229,25 @@ func (c *plcConnectionCache) readFromPlc(t *testing.T, preConnectJob func(), con case <-time.After(1 * time.Second): t.Errorf("PlcConnectionCache.ReadRequest.Read() timeout") } - return ch + return tracerChan } -func (c *plcConnectionCache) executeAndTestReadFromPlc(t *testing.T, preConnectJob func(), connectionString string, resourceString string, expectedTraceEntries []string, expectedNumTotalConnections int) <-chan struct{} { - ch := make(chan struct{}) +func executeAndTestReadFromPlc(t *testing.T, c *plcConnectionCache, preConnectJob func(), connectionString string, resourceString string, expectedTraceEntries []string, expectedNumTotalConnections int) <-chan struct{} { + t.Helper() + ch := make(chan struct{}, 1) c.wg.Go(func() { + t.Log("Starting goroutine") // Read once from the c. - traces := <-c.readFromPlc(t, preConnectJob, connectionString, resourceString) + t.Log("Reading from the cache") + var traces []tracer.TraceEntry + select { + case traces = <-readFromPlc(t, c, preConnectJob, connectionString, resourceString): + case <-t.Context().Done(): + t.Log("Context done", t.Context().Err()) + return + } + + t.Log("Finished reading from the cache") // In the log we should see one "Successfully connected" entry. if len(traces) != len(expectedTraceEntries) { @@ -246,16 +255,20 @@ func (c *plcConnectionCache) executeAndTestReadFromPlc(t *testing.T, preConnectJ ch <- struct{}{} return } + t.Log("Checking trace entries") for i, expectedTraceEntry := range expectedTraceEntries { currentTraceEntry := traces[i].Operation + "-" + traces[i].Message if expectedTraceEntry != currentTraceEntry { t.Errorf("Expected %s as trace entry but got %s", expectedTraceEntry, currentTraceEntry) } } + t.Log("Trace entries are as expected") + t.Log("Checking number of connections in the cache") // Now there should be one connection in the c. if len(c.connections) != expectedNumTotalConnections { t.Errorf("Expected %d connections in the c but got %d", expectedNumTotalConnections, len(c.connections)) } + t.Log("Number of connections in the cache is as expected") ch <- struct{}{} }) return ch @@ -268,7 +281,7 @@ func TestPlcConnectionCache_ReusingAnExistingConnection(t *testing.T) { assert.NoError(t, driverManager.Close()) }) driverManager.RegisterDriver(simulated.NewDriver(options.WithCustomLogger(logger))) - cache := plcConnectionCache{ + cache := &plcConnectionCache{ driverManager: driverManager, maxLeaseTime: 5 * time.Second, maxWaitTime: 25 * time.Second, @@ -284,8 +297,9 @@ func TestPlcConnectionCache_ReusingAnExistingConnection(t *testing.T) { } // Read once from the cache. - finishedChan := cache.executeAndTestReadFromPlc( + finishedChan := executeAndTestReadFromPlc( t, + cache, nil, "simulated://1.2.3.4:42?traceEnabled=true", "RANDOM/test_random:BOOL", @@ -306,8 +320,9 @@ func TestPlcConnectionCache_ReusingAnExistingConnection(t *testing.T) { } // Request the same connection for a second time. - finishedChan = cache.executeAndTestReadFromPlc( + finishedChan = executeAndTestReadFromPlc( t, + cache, nil, "simulated://1.2.3.4:42?traceEnabled=true", "RANDOM/test_random:BOOL", @@ -345,7 +360,7 @@ func TestPlcConnectionCache_MultipleConcurrentConnectionRequests(t *testing.T) { assert.NoError(t, driverManager.Close()) }) driverManager.RegisterDriver(simulated.NewDriver(options.WithCustomLogger(logger))) - cache := plcConnectionCache{ + cache := &plcConnectionCache{ driverManager: driverManager, maxLeaseTime: 5 * time.Second, maxWaitTime: 25 * time.Second, @@ -364,8 +379,9 @@ func TestPlcConnectionCache_MultipleConcurrentConnectionRequests(t *testing.T) { floodGate.Lock() // We use a cas mutex write lock to lock the floodgate // Read once from the cache. - firstRun := cache.executeAndTestReadFromPlc( + firstRun := executeAndTestReadFromPlc( t, + cache, func() { floodGate.RLock() defer floodGate.RUnlock() @@ -387,8 +403,9 @@ func TestPlcConnectionCache_MultipleConcurrentConnectionRequests(t *testing.T) { // As the connection takes 100ms, the second connection request will come // in while the first is still not finished. So in theory it would have // to wait for the first operation to be finished first. - secondRun := cache.executeAndTestReadFromPlc( + secondRun := executeAndTestReadFromPlc( t, + cache, func() { floodGate.RLock() defer floodGate.RUnlock() @@ -460,7 +477,7 @@ func TestPlcConnectionCache_ConnectWithError(t *testing.T) { t.Error("An error was expected") return } - if err.Error() != "hurz" { + if !strings.Contains(err.Error(), "hurz") { t.Errorf("An error '%s' was expected, but got '%s'", "hurz", err) } t.Log(conn) @@ -496,28 +513,24 @@ func TestPlcConnectionCache_ReturningConnectionWithPingError(t *testing.T) { conn, err := cache.GetConnection(t.Context(), "simulated://1.2.3.4:42?pingError=hurz&traceEnabled=true") if err != nil { t.Errorf("PlcConnectionCache.GetConnection() error = %v", err) + t.FailNow() } connection := conn.(*plcConnectionLease) - if connection != nil { - err := connection.Close() - if err != nil { - /*traces := (closeResult.(_default.DefaultPlcConnectionCloseResult)).GetTraces() - // We expect 4 traces (Connect start & success and Ping start and error. - require.Len(t, traces, 4, "Expected %d trace entries but got %d", 4, len(traces)) - if traces[0].Operation+"-"+traces[0].Message != "connect-started" { - t.Errorf("Expected '%s' as first trace message, but got '%s'", "connect-started", traces[0]) - } - if traces[1].Operation+"-"+traces[1].Message != "connect-success" { - t.Errorf("Expected '%s' as second trace message, but got '%s'", "connect-success", traces[1]) - } - if traces[2].Operation+"-"+traces[2].Message != "ping-started" { - t.Errorf("Expected '%s' as third trace message, but got '%s'", "ping-started", traces[2]) - } - if traces[3].Operation+"-"+traces[3].Message != "ping-error: hurz" { - t.Errorf("Expected '%s' as fourth trace message, but got '%s'", "ping-error: hurz", traces[3]) - }*/ - } else { - t.Errorf("Expected a result, but got nil") + if err := connection.Close(); err != nil { + traces := connection.GetLastTraces() + // We expect 4 traces (Connect start & success and Ping start and error. + require.Len(t, traces, 4, "Expected %d trace entries but got %d", 4, len(traces)) + if traces[0].Operation+"-"+traces[0].Message != "connect-started" { + t.Errorf("Expected '%s' as first trace message, but got '%s'", "connect-started", traces[0]) + } + if traces[1].Operation+"-"+traces[1].Message != "connect-success" { + t.Errorf("Expected '%s' as second trace message, but got '%s'", "connect-success", traces[1]) + } + if traces[2].Operation+"-"+traces[2].Message != "ping-started" { + t.Errorf("Expected '%s' as third trace message, but got '%s'", "ping-started", traces[2]) + } + if traces[3].Operation+"-"+traces[3].Message != "ping-error: hurz" { + t.Errorf("Expected '%s' as fourth trace message, but got '%s'", "ping-error: hurz", traces[3]) } } } @@ -531,7 +544,7 @@ func TestPlcConnectionCache_PingTimeout(t *testing.T) { assert.NoError(t, driverManager.Close()) }) driverManager.RegisterDriver(simulated.NewDriver(options.WithCustomLogger(logger))) - cache := plcConnectionCache{ + cache := &plcConnectionCache{ driverManager: driverManager, maxLeaseTime: 5 * time.Second, maxWaitTime: 25 * time.Second, @@ -547,8 +560,9 @@ func TestPlcConnectionCache_PingTimeout(t *testing.T) { } // Read once from the cache. - firstRun := cache.executeAndTestReadFromPlc( + firstRun := executeAndTestReadFromPlc( t, + cache, nil, "simulated://1.2.3.4:42?pingDelay=10000&traceEnabled=true", "RANDOM/test_random:BOOL", @@ -580,7 +594,7 @@ func TestPlcConnectionCache_SecondCallGetNewConnectionAfterPingTimeout(t *testin assert.NoError(t, driverManager.Close()) }) driverManager.RegisterDriver(simulated.NewDriver(options.WithCustomLogger(logger))) - cache := plcConnectionCache{ + cache := &plcConnectionCache{ driverManager: driverManager, maxLeaseTime: 5 * time.Second, maxWaitTime: 25 * time.Second, @@ -596,8 +610,9 @@ func TestPlcConnectionCache_SecondCallGetNewConnectionAfterPingTimeout(t *testin } // Read once from the cache. - firstRun := cache.executeAndTestReadFromPlc( + firstRun := executeAndTestReadFromPlc( t, + cache, nil, "simulated://1.2.3.4:42?pingDelay=10000&connectionDelay=100&traceEnabled=true", "RANDOM/test_random:BOOL", @@ -616,8 +631,9 @@ func TestPlcConnectionCache_SecondCallGetNewConnectionAfterPingTimeout(t *testin // As the connection takes 100ms, the second connection request will come // in while the first is still not finished. So in theory it would have // to wait for the first operation to be finished first. - secondRun := cache.executeAndTestReadFromPlc( + secondRun := executeAndTestReadFromPlc( t, + cache, nil, "simulated://1.2.3.4:42?pingDelay=10000&connectionDelay=100&traceEnabled=true", "RANDOM/test_random:BOOL", @@ -659,138 +675,6 @@ func TestPlcConnectionCache_SecondCallGetNewConnectionAfterPingTimeout(t *testin assert.Equal(t, "success", traces[4].Message, "Unexpected message") } -// In this test the first client requests a connection, but doesn't listen on the response-channel -// This shouldn't block the connection cache. -func TestPlcConnectionCache_FistReadGivesUpBeforeItGetsTheConnectionSoSecondOneTakesOver(t *testing.T) { - logger := testutils.ProduceTestingLogger(t) - driverManager := plc4go.NewPlcDriverManager(config.WithCustomLogger(logger)) - t.Cleanup(func() { - assert.NoError(t, driverManager.Close()) - }) - driverManager.RegisterDriver(simulated.NewDriver(options.WithCustomLogger(logger))) - cache := plcConnectionCache{ - driverManager: driverManager, - maxLeaseTime: 5 * time.Second, - maxWaitTime: 25 * time.Second, - cacheLock: lock.NewCASMutex(), - connections: make(map[string]*connectionContainer), - tracer: nil, - } - cache.EnableTracer() - - // Initially there should be no connection in the cache. - if len(cache.connections) != 0 { - t.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections)) - } - - // Intentionally just ignore the response. - cache.GetConnection(t.Context(), "simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true") - - time.Sleep(1 * time.Millisecond) - - // Read once from the cache. - // NOTE: It doesn't contain the connect-part, as the previous connection handled that. - firstRun := cache.executeAndTestReadFromPlc( - t, - nil, - "simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true", - "RANDOM/test_random:BOOL", - []string{ - "read-started", - "read-success", - "ping-started", - "ping-success", - }, - 1, - ) - - select { - case _ = <-firstRun: - break - case <-time.After(30 * time.Second * time.Duration(debugTimeout)): - t.Errorf("Timeout") - } -} - -func TestPlcConnectionCache_SecondConnectionGivenUpWaiting(t *testing.T) { - logger := testutils.ProduceTestingLogger(t) - driverManager := plc4go.NewPlcDriverManager(config.WithCustomLogger(logger)) - t.Cleanup(func() { - assert.NoError(t, driverManager.Close()) - }) - driverManager.RegisterDriver(simulated.NewDriver(options.WithCustomLogger(logger))) - cache := plcConnectionCache{ - driverManager: driverManager, - maxLeaseTime: 5 * time.Second, - maxWaitTime: 25 * time.Second, - cacheLock: lock.NewCASMutex(), - connections: make(map[string]*connectionContainer), - tracer: nil, - } - cache.EnableTracer() - - // Initially there should be no connection in the cache. - if len(cache.connections) != 0 { - t.Errorf("Expected %d connections in the cache but got %d", 0, len(cache.connections)) - } - - // Read once from the cache. - firstRun := cache.executeAndTestReadFromPlc( - t, - nil, - "simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true", - "RANDOM/test_random:BOOL", - []string{ - "connect-started", - "connect-success", - "read-started", - "read-success", - "ping-started", - "ping-success", - }, - 1, - ) - - time.Sleep(1 * time.Millisecond) - - // Almost instantly we try to get a new connection but don't listen for the result - cache.GetConnection(t.Context(), "simulated://1.2.3.4:42?connectionDelay=100&traceEnabled=true") - - // Wait for the first operation to finish - select { - case _ = <-firstRun: - case <-time.After(30 * time.Second * time.Duration(debugTimeout)): - t.Errorf("Timeout") - } - - // Wait for 1s to have the connection cache timeout (10ms) the lease as nobody's listening. - time.Sleep(1 * time.Second) - - // This should be quite equal to the serial case as the connections are requested serially. - assert.NotNil(t, cache.GetTracer(), "Tracer should be available") - traces := cache.GetTracer().GetTraces() - if assert.Equal(t, 5, len(traces), "Unexpected number of trace entries") { - // First is needs to create a new container for this connection - assert.Equal(t, "create new cached connection", traces[0].Message, "Unexpected message") - // Then it gets a lease for the connection - assert.Equal(t, "lease", traces[1].Message, "Unexpected message") - // And a second time - assert.Equal(t, "lease", traces[2].Message, "Unexpected message") - // Now the delay of 100ms is over, and we should see the first success - assert.Equal(t, "success", traces[3].Message, "Unexpected message") - // Now the first operation is finished, and we should see the second give up - assert.Equal(t, "client given up", traces[4].Message, "Unexpected message") - } else if len(traces) > 0 { - var values string - for _, traceEntry := range traces { - values = values + traceEntry.Operation + "-" + traceEntry.Message + ", " - } - t.Errorf("Got traces: %s", values) - } else { - t.Error("No traces") - } -} - func TestPlcConnectionCache_MaximumWaitTimeReached(t *testing.T) { logger := testutils.ProduceTestingLogger(t) driverManager := plc4go.NewPlcDriverManager(config.WithCustomLogger(logger)) @@ -821,24 +705,19 @@ func TestPlcConnectionCache_MaximumWaitTimeReached(t *testing.T) { time.Sleep(1 * time.Millisecond) // Just make sure the first two connections are returned as soon as they are received - if assert.Nil(t, firstConn) { - // Give back the connection. - _ = firstConn.Close() - } + _ = firstConn.Close() + // Second one blocks secondConn, err := cache.GetConnection(t.Context(), "simulated://1.2.3.4:42?connectionDelay=100&pingDelay=4000&traceEnabled=true") require.NoError(t, err) + t.Cleanup(func() { + _ = secondConn.Close() + }) time.Sleep(1 * time.Millisecond) - // Just make sure the first two connections are returned as soon as they are received - if assert.Nil(t, firstConn) { - // Give back the connection. - _ = secondConn.Close() - } - // The third connection should be given up by the cache thrirdConn, err := cache.GetConnection(t.Context(), "simulated://1.2.3.4:42?connectionDelay=100&pingDelay=4000&traceEnabled=true") - require.NoError(t, err) - require.NotNil(t, thrirdConn) + require.Error(t, err) + require.Nil(t, thrirdConn) } diff --git a/plc4go/pkg/api/cache/common.go b/plc4go/pkg/api/cache/common.go index 7369f89c42..2f7dac3eea 100644 --- a/plc4go/pkg/api/cache/common.go +++ b/plc4go/pkg/api/cache/common.go @@ -19,11 +19,6 @@ package cache -type PlcConnectionCacheCloseResult interface { - GetConnectionCache() PlcConnectionCache - GetErr() error -} - /////////////////////////////////////// /////////////////////////////////////// // @@ -83,33 +78,3 @@ func (c *connectionErrorEvent) getConnectionContainer() *connectionContainer { func (c *connectionErrorEvent) getError() error { return c.err } - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// PlcConnectionCacheCloseResult / plcConnectionCacheCloseResult -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -type plcConnectionCacheCloseResult struct { - connectionCache PlcConnectionCache - err error -} - -func newDefaultPlcConnectionCacheCloseResult(connectionCache PlcConnectionCache, err error) PlcConnectionCacheCloseResult { - return &plcConnectionCacheCloseResult{ - connectionCache: connectionCache, - err: err, - } -} - -func (p plcConnectionCacheCloseResult) GetConnectionCache() PlcConnectionCache { - return p.connectionCache -} - -func (p plcConnectionCacheCloseResult) GetErr() error { - return p.err -} - -// -// Internal section -// -/////////////////////////////////////// -/////////////////////////////////////// diff --git a/plc4go/pkg/api/cache/connectionContainer.go b/plc4go/pkg/api/cache/connectionContainer.go index 0e29c88391..1f7f7dba31 100644 --- a/plc4go/pkg/api/cache/connectionContainer.go +++ b/plc4go/pkg/api/cache/connectionContainer.go @@ -22,7 +22,6 @@ package cache import ( "context" "fmt" - "sync" "github.com/pkg/errors" "github.com/rs/zerolog" @@ -46,12 +45,11 @@ type connectionContainer struct { // Listeners for connection events. listeners []connectionListener - wg sync.WaitGroup // use to track spawned go routines - log zerolog.Logger } type connectionRequest struct { + ctx context.Context connChan chan plc4go.PlcConnection errChan chan error } @@ -100,13 +98,20 @@ func (c *connectionContainer) connect(ctx context.Context) { // Send a failure to all waiting clients. if len(c.queue) > 0 { + c.log.Trace().Msg("notifies waiting clients of error") for _, waitingClient := range c.queue { select { case waitingClient.errChan <- err: + c.log.Trace().Msg("sent error to waiting client") + case <-waitingClient.ctx.Done(): + c.log.Trace().Msg("waiting client timed out") case <-ctx.Done(): + c.log.Trace().Msg("context timed out") } } c.queue = nil + } else { + c.log.Trace().Msg("no waiting clients") } return } @@ -122,7 +127,8 @@ func (c *connectionContainer) connect(ctx context.Context) { // Mark the connection as idle for now. c.state = StateIdle // If there is a request in the queue, hand out the connection to that. - if len(c.queue) > 0 { + if waitingClientsLen := len(c.queue); waitingClientsLen > 0 { + c.log.Trace().Int("waitingClientsLen", waitingClientsLen).Msg("notifies waiting clients of connection") // Get the first in the queue. queueHead := c.queue[0] c.queue = c.queue[1:] @@ -134,6 +140,8 @@ func (c *connectionContainer) connect(ctx context.Context) { // as the getConnection function of the connection cache // is definitely eagerly waiting for input. queueHead.connChan <- connection + } else { + c.log.Trace().Msg("no waiting clients") } } @@ -145,7 +153,7 @@ func (c *connectionContainer) addListener(listener connectionListener) { c.listeners = append(c.listeners, listener) } -func (c *connectionContainer) lease() (chan plc4go.PlcConnection, chan error) { +func (c *connectionContainer) lease(ctx context.Context) (chan plc4go.PlcConnection, chan error) { c.lock.Lock() defer c.lock.Unlock() @@ -166,7 +174,7 @@ func (c *connectionContainer) lease() (chan plc4go.PlcConnection, chan error) { case StateInUse, StateInitialized: // If the connection is currently busy or not finished initializing, // add the new channel to the queue for this connection. - c.queue = append(c.queue, connectionRequest{connChan: connectionChan, errChan: errorChan}) + c.queue = append(c.queue, connectionRequest{ctx: ctx, connChan: connectionChan, errChan: errorChan}) c.log.Debug().Str("connectionString", c.connectionString). Int("waiting-queue-size", len(c.queue)). Msg("Added lease-request to queue.") @@ -200,24 +208,21 @@ func (c *connectionContainer) returnConnection(ctx context.Context, newState cac } // Check how many others are waiting for this connection. - if len(c.queue) > 0 { + if waitingClientsLen := len(c.queue); waitingClientsLen > 0 { + c.log.Trace().Int("waitingClientsLen", waitingClientsLen).Msg("notifies waiting clients of connection return") // There are waiting clients, give the connection to the next client in the line. next := c.queue[0] c.queue = c.queue[1:] c.leaseCounter++ connection := newPlcConnectionLease(c, c.leaseCounter, c.connection) - // Send asynchronously as the receiver might have given up waiting, - // and we don'c want anything to block here. 1ms should be enough for - // the calling process to reach the blocking read. - c.wg.Go(func() { - // In this case we don'c need to check for blocks - // as the getConnection function of the connection cache - // is definitely eagerly waiting for input. - next.connChan <- connection - c.log.Debug().Str("connectionString", c.connectionString). - Int("waiting-queue-size", len(c.queue)). - Msg("Returned connection to the next client waiting.") - }) + + // In this case we don'c need to check for blocks + // as the getConnection function of the connection cache + // is definitely eagerly waiting for input. + next.connChan <- connection + c.log.Debug().Str("connectionString", c.connectionString). + Int("waiting-queue-size", len(c.queue)). + Msg("Returned connection to the next client waiting.") } else { // Otherwise, just mark the connection as idle. c.log.Debug().Str("connectionString", c.connectionString). diff --git a/plc4go/pkg/api/cache/connectionContainer_test.go b/plc4go/pkg/api/cache/connectionContainer_test.go index 4565368acc..451b709295 100644 --- a/plc4go/pkg/api/cache/connectionContainer_test.go +++ b/plc4go/pkg/api/cache/connectionContainer_test.go @@ -20,6 +20,7 @@ package cache import ( + "context" "fmt" "testing" @@ -200,9 +201,13 @@ func Test_connectionContainer_lease(t1 *testing.T) { queue []connectionRequest listeners []connectionListener } + type args struct { + ctx context.Context + } tests := []struct { name string fields fields + args args setup func(t *testing.T, fields *fields) }{ { @@ -211,6 +216,9 @@ func Test_connectionContainer_lease(t1 *testing.T) { connectionString: "simulated://1.2.3.4:42", lock: lock.NewCASMutex(), }, + args: args{ + ctx: t1.Context(), + }, setup: func(t *testing.T, fields *fields) { logger := testutils.ProduceTestingLogger(t) @@ -241,7 +249,7 @@ func Test_connectionContainer_lease(t1 *testing.T) { queue: tt.fields.queue, listeners: tt.fields.listeners, } - lease, errors := c.lease() + lease, errors := c.lease(tt.args.ctx) assert.NotNil(t, lease) assert.NotNil(t, errors) }) diff --git a/plc4go/pkg/api/cache/mocks_test.go b/plc4go/pkg/api/cache/mocks_test.go index 66165cbd3f..b382a54404 100644 --- a/plc4go/pkg/api/cache/mocks_test.go +++ b/plc4go/pkg/api/cache/mocks_test.go @@ -60,20 +60,18 @@ func (_m *MockPlcConnectionCache) EXPECT() *MockPlcConnectionCache_Expecter { } // Close provides a mock function for the type MockPlcConnectionCache -func (_mock *MockPlcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult { +func (_mock *MockPlcConnectionCache) Close() error { ret := _mock.Called() if len(ret) == 0 { panic("no return value specified for Close") } - var r0 <-chan PlcConnectionCacheCloseResult - if returnFunc, ok := ret.Get(0).(func() <-chan PlcConnectionCacheCloseResult); ok { + var r0 error + if returnFunc, ok := ret.Get(0).(func() error); ok { r0 = returnFunc() } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan PlcConnectionCacheCloseResult) - } + r0 = ret.Error(0) } return r0 } @@ -95,12 +93,12 @@ func (_c *MockPlcConnectionCache_Close_Call) Run(run func()) *MockPlcConnectionC return _c } -func (_c *MockPlcConnectionCache_Close_Call) Return(plcConnectionCacheCloseResultCh <-chan PlcConnectionCacheCloseResult) *MockPlcConnectionCache_Close_Call { - _c.Call.Return(plcConnectionCacheCloseResultCh) +func (_c *MockPlcConnectionCache_Close_Call) Return(err error) *MockPlcConnectionCache_Close_Call { + _c.Call.Return(err) return _c } -func (_c *MockPlcConnectionCache_Close_Call) RunAndReturn(run func() <-chan PlcConnectionCacheCloseResult) *MockPlcConnectionCache_Close_Call { +func (_c *MockPlcConnectionCache_Close_Call) RunAndReturn(run func() error) *MockPlcConnectionCache_Close_Call { _c.Call.Return(run) return _c } @@ -173,123 +171,6 @@ func (_c *MockPlcConnectionCache_GetConnection_Call) RunAndReturn(run func(ctx c return _c } -// NewMockPlcConnectionCacheCloseResult creates a new instance of MockPlcConnectionCacheCloseResult. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockPlcConnectionCacheCloseResult(t interface { - mock.TestingT - Cleanup(func()) -}) *MockPlcConnectionCacheCloseResult { - mock := &MockPlcConnectionCacheCloseResult{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} - -// MockPlcConnectionCacheCloseResult is an autogenerated mock type for the PlcConnectionCacheCloseResult type -type MockPlcConnectionCacheCloseResult struct { - mock.Mock -} - -type MockPlcConnectionCacheCloseResult_Expecter struct { - mock *mock.Mock -} - -func (_m *MockPlcConnectionCacheCloseResult) EXPECT() *MockPlcConnectionCacheCloseResult_Expecter { - return &MockPlcConnectionCacheCloseResult_Expecter{mock: &_m.Mock} -} - -// GetConnectionCache provides a mock function for the type MockPlcConnectionCacheCloseResult -func (_mock *MockPlcConnectionCacheCloseResult) GetConnectionCache() PlcConnectionCache { - ret := _mock.Called() - - if len(ret) == 0 { - panic("no return value specified for GetConnectionCache") - } - - var r0 PlcConnectionCache - if returnFunc, ok := ret.Get(0).(func() PlcConnectionCache); ok { - r0 = returnFunc() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(PlcConnectionCache) - } - } - return r0 -} - -// MockPlcConnectionCacheCloseResult_GetConnectionCache_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetConnectionCache' -type MockPlcConnectionCacheCloseResult_GetConnectionCache_Call struct { - *mock.Call -} - -// GetConnectionCache is a helper method to define mock.On call -func (_e *MockPlcConnectionCacheCloseResult_Expecter) GetConnectionCache() *MockPlcConnectionCacheCloseResult_GetConnectionCache_Call { - return &MockPlcConnectionCacheCloseResult_GetConnectionCache_Call{Call: _e.mock.On("GetConnectionCache")} -} - -func (_c *MockPlcConnectionCacheCloseResult_GetConnectionCache_Call) Run(run func()) *MockPlcConnectionCacheCloseResult_GetConnectionCache_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockPlcConnectionCacheCloseResult_GetConnectionCache_Call) Return(plcConnectionCache PlcConnectionCache) *MockPlcConnectionCacheCloseResult_GetConnectionCache_Call { - _c.Call.Return(plcConnectionCache) - return _c -} - -func (_c *MockPlcConnectionCacheCloseResult_GetConnectionCache_Call) RunAndReturn(run func() PlcConnectionCache) *MockPlcConnectionCacheCloseResult_GetConnectionCache_Call { - _c.Call.Return(run) - return _c -} - -// GetErr provides a mock function for the type MockPlcConnectionCacheCloseResult -func (_mock *MockPlcConnectionCacheCloseResult) GetErr() error { - ret := _mock.Called() - - if len(ret) == 0 { - panic("no return value specified for GetErr") - } - - var r0 error - if returnFunc, ok := ret.Get(0).(func() error); ok { - r0 = returnFunc() - } else { - r0 = ret.Error(0) - } - return r0 -} - -// MockPlcConnectionCacheCloseResult_GetErr_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetErr' -type MockPlcConnectionCacheCloseResult_GetErr_Call struct { - *mock.Call -} - -// GetErr is a helper method to define mock.On call -func (_e *MockPlcConnectionCacheCloseResult_Expecter) GetErr() *MockPlcConnectionCacheCloseResult_GetErr_Call { - return &MockPlcConnectionCacheCloseResult_GetErr_Call{Call: _e.mock.On("GetErr")} -} - -func (_c *MockPlcConnectionCacheCloseResult_GetErr_Call) Run(run func()) *MockPlcConnectionCacheCloseResult_GetErr_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockPlcConnectionCacheCloseResult_GetErr_Call) Return(err error) *MockPlcConnectionCacheCloseResult_GetErr_Call { - _c.Call.Return(err) - return _c -} - -func (_c *MockPlcConnectionCacheCloseResult_GetErr_Call) RunAndReturn(run func() error) *MockPlcConnectionCacheCloseResult_GetErr_Call { - _c.Call.Return(run) - return _c -} - // newMockconnectionListener creates a new instance of mockconnectionListener. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func newMockconnectionListener(t interface { diff --git a/plc4go/pkg/api/cache/plcConnectionLease.go b/plc4go/pkg/api/cache/plcConnectionLease.go index 289342b987..baadf3c26a 100644 --- a/plc4go/pkg/api/cache/plcConnectionLease.go +++ b/plc4go/pkg/api/cache/plcConnectionLease.go @@ -36,6 +36,8 @@ type plcConnectionLease struct { leaseId uint32 // The actual connection being cached. connection tracedPlcConnection + // the last traces of this connection + lastTraces []tracer.TraceEntry } func newPlcConnectionLease(connectionContainer *connectionContainer, leaseId uint32, connection tracedPlcConnection) *plcConnectionLease { @@ -110,12 +112,11 @@ func (t *plcConnectionLease) Close() error { if t.IsTraceEnabled() { _tracer := t.GetTracer() // Save all traces. - traces := _tracer.GetTraces() + t.lastTraces = _tracer.GetTraces() // Clear the log. _tracer.ResetTraces() // Reset the connection id back to the one without the lease-id. _tracer.SetConnectionId(t.connection.GetConnectionId()) - _ = traces // TODO: do something with that } // Return the connection to the connection container and don't actually close it. @@ -127,6 +128,10 @@ func (t *plcConnectionLease) Close() error { return err } +func (t *plcConnectionLease) GetLastTraces() []tracer.TraceEntry { + return t.lastTraces +} + func (t *plcConnectionLease) IsConnected() bool { if t.connection == nil { return false diff --git a/plc4go/pkg/api/cache/plcConnectionLease_test.go b/plc4go/pkg/api/cache/plcConnectionLease_test.go index 5bb11d1362..fd7271c904 100644 --- a/plc4go/pkg/api/cache/plcConnectionLease_test.go +++ b/plc4go/pkg/api/cache/plcConnectionLease_test.go @@ -50,7 +50,7 @@ func TestLeasedPlcConnection_IsTraceEnabled(t *testing.T) { tracer: nil, } t.Cleanup(func() { - <-cache.Close() + _ = cache.Close() }) cache.EnableTracer() @@ -84,7 +84,7 @@ func TestLeasedPlcConnection_GetTracer(t *testing.T) { tracer: nil, } t.Cleanup(func() { - <-cache.Close() + _ = cache.Close() }) cache.EnableTracer() @@ -113,7 +113,7 @@ func TestLeasedPlcConnection_GetConnectionId(t *testing.T) { tracer: nil, } t.Cleanup(func() { - <-cache.Close() + _ = cache.Close() }) cache.EnableTracer() @@ -140,7 +140,7 @@ func TestLeasedPlcConnection_Connect(t *testing.T) { tracer: nil, } t.Cleanup(func() { - <-cache.Close() + _ = cache.Close() }) cache.EnableTracer() @@ -167,7 +167,7 @@ func TestLeasedPlcConnection_BlockingClose(t *testing.T) { tracer: nil, } t.Cleanup(func() { - <-cache.Close() + _ = cache.Close() }) cache.EnableTracer() @@ -196,7 +196,7 @@ func TestLeasedPlcConnection_Close(t *testing.T) { tracer: nil, } t.Cleanup(func() { - <-cache.Close() + _ = cache.Close() }) cache.EnableTracer() @@ -225,7 +225,7 @@ func TestLeasedPlcConnection_IsConnected(t *testing.T) { tracer: nil, } t.Cleanup(func() { - <-cache.Close() + _ = cache.Close() }) cache.EnableTracer() @@ -256,7 +256,7 @@ func TestLeasedPlcConnection_Ping(t *testing.T) { tracer: nil, } t.Cleanup(func() { - <-cache.Close() + _ = cache.Close() }) cache.EnableTracer() @@ -294,7 +294,7 @@ func TestLeasedPlcConnection_GetMetadata(t *testing.T) { tracer: nil, } t.Cleanup(func() { - <-cache.Close() + _ = cache.Close() }) cache.EnableTracer() @@ -338,7 +338,7 @@ func TestLeasedPlcConnection_ReadRequestBuilder(t *testing.T) { tracer: nil, } t.Cleanup(func() { - <-cache.Close() + _ = cache.Close() }) cache.EnableTracer() @@ -379,7 +379,7 @@ func TestLeasedPlcConnection_WriteRequestBuilder(t *testing.T) { tracer: nil, } t.Cleanup(func() { - <-cache.Close() + _ = cache.Close() }) cache.EnableTracer() @@ -420,7 +420,7 @@ func TestLeasedPlcConnection_SubscriptionRequestBuilder(t *testing.T) { tracer: nil, } t.Cleanup(func() { - <-cache.Close() + _ = cache.Close() }) cache.EnableTracer() @@ -461,7 +461,7 @@ func TestLeasedPlcConnection_UnsubscriptionRequestBuilder(t *testing.T) { tracer: nil, } t.Cleanup(func() { - <-cache.Close() + _ = cache.Close() }) cache.EnableTracer() @@ -510,7 +510,7 @@ func TestLeasedPlcConnection_BrowseRequestBuilder(t *testing.T) { tracer: nil, } t.Cleanup(func() { - <-cache.Close() + _ = cache.Close() }) cache.EnableTracer()
