This is an automated email from the ASF dual-hosted git repository. sruehl pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 00d8afa7aaf7c3062f2eaa9be6ae3f90f1c2fb02 Author: Sebastian Rühl <[email protected]> AuthorDate: Thu Nov 20 13:17:22 2025 +0100 feat(plc4go/spi): add interaction id to better identify failing requests --- plc4go/internal/ads/DiscoveryMessageCodec.go | 4 +- plc4go/internal/ads/Interactions.go | 12 +- plc4go/internal/ads/MessageCodec.go | 4 +- .../bacnetip/ApplicationLayerMessageCodec.go | 6 +- plc4go/internal/bacnetip/MessageCodec.go | 4 +- plc4go/internal/bacnetip/Reader.go | 2 +- plc4go/internal/cbus/Browser_test.go | 2 +- plc4go/internal/cbus/Connection.go | 136 ++++---- plc4go/internal/cbus/MessageCodec.go | 4 +- plc4go/internal/cbus/MessageCodec_test.go | 6 +- plc4go/internal/cbus/Reader.go | 2 +- plc4go/internal/cbus/Writer.go | 2 +- plc4go/internal/eip/Connection.go | 10 +- plc4go/internal/eip/MessageCodec.go | 4 +- plc4go/internal/eip/Reader.go | 2 +- .../knxnetip/ConnectionInternalOperations.go | 375 +++++++++++---------- plc4go/internal/knxnetip/Discoverer.go | 2 +- plc4go/internal/knxnetip/MessageCodec.go | 6 +- plc4go/internal/modbus/Connection.go | 2 +- plc4go/internal/modbus/MessageCodec.go | 4 +- plc4go/internal/modbus/Reader.go | 2 +- plc4go/internal/modbus/Writer.go | 2 +- plc4go/internal/opcua/MessageCodec.go | 4 +- plc4go/internal/opcua/SecureChannel.go | 18 +- plc4go/internal/s7/Connection.go | 6 +- plc4go/internal/s7/MessageCodec.go | 4 +- plc4go/internal/s7/Reader.go | 2 +- plc4go/internal/s7/Writer.go | 2 +- plc4go/spi/MessageCodec.go | 6 +- plc4go/spi/default/DefaultCodec.go | 22 +- plc4go/spi/default/DefaultCodec_test.go | 22 +- plc4go/spi/default/defaultExpectation.go | 6 +- plc4go/spi/default/mocks_test.go | 214 +++++++----- plc4go/spi/mocks_test.go | 94 +++--- plc4go/spi/transports/test/TransportInstance.go | 2 +- 35 files changed, 545 insertions(+), 450 deletions(-) diff --git a/plc4go/internal/ads/DiscoveryMessageCodec.go b/plc4go/internal/ads/DiscoveryMessageCodec.go index 2077790658..1688760948 100644 --- a/plc4go/internal/ads/DiscoveryMessageCodec.go +++ b/plc4go/internal/ads/DiscoveryMessageCodec.go @@ -54,8 +54,8 @@ func (m *DiscoveryMessageCodec) GetCodec() spi.MessageCodec { return m } -func (m *DiscoveryMessageCodec) Send(ctx context.Context, message spi.Message) error { - m.log.Trace().Msg("Sending message") +func (m *DiscoveryMessageCodec) Send(ctx context.Context, interactionId string, message spi.Message) error { + m.log.Trace().Str("interactionId", interactionId).Msg("Sending message") // Cast the message to the correct type of struct tcpPaket := message.(model.AdsDiscovery) // Serialize the request diff --git a/plc4go/internal/ads/Interactions.go b/plc4go/internal/ads/Interactions.go index 5aa02e4286..acfde00629 100644 --- a/plc4go/internal/ads/Interactions.go +++ b/plc4go/internal/ads/Interactions.go @@ -42,7 +42,7 @@ func (m *Connection) ExecuteAdsReadDeviceInfoRequest(ctx context.Context) (model } }() request := m.NewAdsReadDeviceInfoRequest() - if err := m.messageCodec.SendRequest(ctx, request, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "ads_read_device_info_request", request, func(message spi.Message) bool { amsTcpPacket, ok := message.(model.AmsTCPPacket) if !ok { return false @@ -80,7 +80,7 @@ func (m *Connection) ExecuteAdsReadRequest(ctx context.Context, indexGroup uint3 } }() request := m.NewAdsReadRequest(indexGroup, indexOffset, length) - if err := m.messageCodec.SendRequest(ctx, request, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "ads_read_request", request, func(message spi.Message) bool { amsTcpPacket, ok := message.(model.AmsTCPPacket) if !ok { return false @@ -118,7 +118,7 @@ func (m *Connection) ExecuteAdsWriteRequest(ctx context.Context, indexGroup uint } }() request := m.NewAdsWriteRequest(indexGroup, indexOffset, data) - if err := m.messageCodec.SendRequest(ctx, request, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "execute_ads_write_request", request, func(message spi.Message) bool { amsTcpPacket, ok := message.(model.AmsTCPPacket) if !ok { return false @@ -156,7 +156,7 @@ func (m *Connection) ExecuteAdsReadWriteRequest(ctx context.Context, indexGroup } }() request := m.NewAdsReadWriteRequest(indexGroup, indexOffset, readLength, items, writeData) - if err := m.messageCodec.SendRequest(ctx, request, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "ads_read_write_request", request, func(message spi.Message) bool { amsTcpPacket, ok := message.(model.AmsTCPPacket) if !ok { return false @@ -194,7 +194,7 @@ func (m *Connection) ExecuteAdsAddDeviceNotificationRequest(ctx context.Context, } }() request := m.NewAdsAddDeviceNotificationRequest(indexGroup, indexOffset, length, transmissionMode, maxDelay, cycleTime) - if err := m.messageCodec.SendRequest(ctx, request, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "ads_add_device_notification_request", request, func(message spi.Message) bool { amsTcpPacket, ok := message.(model.AmsTCPPacket) if !ok { return false @@ -231,7 +231,7 @@ func (m *Connection) ExecuteAdsDeleteDeviceNotificationRequest(ctx context.Conte } }() request := m.NewAdsDeleteDeviceNotificationRequest(notificationHandle) - if err := m.messageCodec.SendRequest(ctx, request, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "ads_delete_device_notification_request", request, func(message spi.Message) bool { amsTcpPacket, ok := message.(model.AmsTCPPacket) if !ok { return false diff --git a/plc4go/internal/ads/MessageCodec.go b/plc4go/internal/ads/MessageCodec.go index 5bf9e1801d..b263adb443 100644 --- a/plc4go/internal/ads/MessageCodec.go +++ b/plc4go/internal/ads/MessageCodec.go @@ -67,8 +67,8 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec { return m } -func (m *MessageCodec) Send(ctx context.Context, message spi.Message) error { - m.log.Trace().Msg("Sending message") +func (m *MessageCodec) Send(ctx context.Context, interactionId string, message spi.Message) error { + m.log.Trace().Str("interactionId", interactionId).Msg("Sending message") // Cast the message to the correct type of struct tcpPaket := message.(model.AmsTCPPacket) // Serialize the request diff --git a/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go b/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go index e99f31c184..77491c3e53 100644 --- a/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go +++ b/plc4go/internal/bacnetip/ApplicationLayerMessageCodec.go @@ -112,7 +112,7 @@ func (m *ApplicationLayerMessageCodec) IsRunning() bool { return m.messageCode.IsRunning() } -func (m *ApplicationLayerMessageCodec) Send(ctx context.Context, message spi.Message) error { +func (m *ApplicationLayerMessageCodec) Send(ctx context.Context, interactionId string, message spi.Message) error { address, err := pdu.NewAddress(comp.NewArgs(m.remoteAddress)) if err != nil { return err @@ -141,12 +141,12 @@ func (m *ApplicationLayerMessageCodec) Send(ctx context.Context, message spi.Mes return nil } -func (m *ApplicationLayerMessageCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) { +func (m *ApplicationLayerMessageCodec) Expect(ctx context.Context, interactionId string, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) { // TODO: implement me panic("not yet implemented") } -func (m *ApplicationLayerMessageCodec) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) error { +func (m *ApplicationLayerMessageCodec) SendRequest(ctx context.Context, interactionId string, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) error { address, err := pdu.NewAddress(comp.NewArgs(m.remoteAddress)) if err != nil { return err diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go index d1ae617be2..f94e3f2bff 100644 --- a/plc4go/internal/bacnetip/MessageCodec.go +++ b/plc4go/internal/bacnetip/MessageCodec.go @@ -50,8 +50,8 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec { return m } -func (m *MessageCodec) Send(ctx context.Context, message spi.Message) error { - m.log.Trace().Msg("Sending message") +func (m *MessageCodec) Send(ctx context.Context, interactionId string, message spi.Message) error { + m.log.Trace().Str("interactionId", interactionId).Msg("Sending message") // Cast the message to the correct type of struct bvlcPacket := message.(model.BVLC) // Serialize the request diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go index f9604aea46..dc55115712 100644 --- a/plc4go/internal/bacnetip/Reader.go +++ b/plc4go/internal/bacnetip/Reader.go @@ -140,7 +140,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) context.AfterFunc(transactionContext, cancel) // Send the over the wire m.log.Trace().Msg("Send ") - if err := m.messageCodec.SendRequest(ctx, apdu, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "read", apdu, func(message spi.Message) bool { bvlc, ok := message.(readWriteModel.BVLC) if !ok { m.log.Debug().Type("bvlc", bvlc).Msg("Received strange type") diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go index a762976fa8..860463a1af 100644 --- a/plc4go/internal/cbus/Browser_test.go +++ b/plc4go/internal/cbus/Browser_test.go @@ -538,7 +538,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) { currentState.Store(RESET) stateChangeMutex := sync.Mutex{} transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) { - t.Logf("reacting to\n%s", hex.Dump(data)) + t.Logf("reacting to \n%s", hex.Dump(data)) t.Logf("current state %d", currentState.Load()) stateChangeMutex.Lock() defer stateChangeMutex.Unlock() diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go index 0c94af79a9..fd8c631629 100644 --- a/plc4go/internal/cbus/Connection.go +++ b/plc4go/internal/cbus/Connection.go @@ -261,80 +261,84 @@ func (c *Connection) setupConnection(ctx context.Context) error { func (c *Connection) startSubscriptionHandler() { c.log.Debug().Msg("Starting SAL handler") - c.handlerWaitGroup.Go(func() { - salLogger := c.log.With().Str("handlerType", "SAL").Logger() - defer func() { - if err := recover(); err != nil { - salLogger.Error(). - Str("stack", string(debug.Stack())). - Interface("err", err). - Msg("panic-ed") + c.handlerWaitGroup.Go(c.handleSAL) + c.log.Debug().Msg("Starting MMI handler") + c.handlerWaitGroup.Go(c.handleMMI) +} + +func (c *Connection) handleSAL() { + salLogger := c.log.With().Str("handlerType", "SAL").Logger() + defer func() { + if err := recover(); err != nil { + salLogger.Error(). + Str("stack", string(debug.Stack())). + Interface("err", err). + Msg("panic-ed") + } + }() + salLogger.Debug().Msg("SAL handler started") + for c.IsConnected() && c.messageCodec.IsRunning() { + for monitoredSal := range c.messageCodec.monitoredSALs { + if monitoredSal == nil { + salLogger.Trace().Msg("monitoredSal chan closed") + break } - }() - salLogger.Debug().Msg("SAL handler started") - for c.IsConnected() && c.messageCodec.IsRunning() { - for monitoredSal := range c.messageCodec.monitoredSALs { - if monitoredSal == nil { - salLogger.Trace().Msg("monitoredSal chan closed") - break - } - salLogger.Trace(). - Stringer("monitoredSal", monitoredSal). - Msg("got a SAL") - handled := false - for _, subscriber := range c.subscribers { - if ok := subscriber.handleMonitoredSAL(monitoredSal); ok { - salLogger.Debug(). - Stringer("monitoredSal", monitoredSal). - Stringer("subscriber", subscriber). - Msg("handled") - handled = true - } - } - if !handled { + salLogger.Trace(). + Stringer("monitoredSal", monitoredSal). + Msg("got a SAL") + handled := false + for _, subscriber := range c.subscribers { + if ok := subscriber.handleMonitoredSAL(monitoredSal); ok { salLogger.Debug(). Stringer("monitoredSal", monitoredSal). - Msg("SAL was not handled") + Stringer("subscriber", subscriber). + Msg("handled") + handled = true } } + if !handled { + salLogger.Debug(). + Stringer("monitoredSal", monitoredSal). + Msg("SAL was not handled") + } } - salLogger.Info().Msg("handler ended") - }) - c.log.Debug().Msg("Starting MMI handler") - c.handlerWaitGroup.Go(func() { - mmiLogger := c.log.With().Str("handlerType", "MMI").Logger() - defer func() { - if err := recover(); err != nil { - mmiLogger.Error(). - Str("stack", string(debug.Stack())). - Interface("err", err). - Msg("panic-ed") + } + salLogger.Info().Msg("handler ended") +} + +func (c *Connection) handleMMI() { + mmiLogger := c.log.With().Str("handlerType", "MMI").Logger() + defer func() { + if err := recover(); err != nil { + mmiLogger.Error(). + Str("stack", string(debug.Stack())). + Interface("err", err). + Msg("panic-ed") + } + }() + mmiLogger.Debug().Msg("default MMI started") + for c.IsConnected() && c.messageCodec.IsRunning() { + for calReply := range c.messageCodec.monitoredMMIs { + if calReply == nil { + mmiLogger.Trace().Msg("channel closed") + break } - }() - mmiLogger.Debug().Msg("default MMI started") - for c.IsConnected() && c.messageCodec.IsRunning() { - for calReply := range c.messageCodec.monitoredMMIs { - if calReply == nil { - mmiLogger.Trace().Msg("channel closed") - break - } - mmiLogger.Trace().Msg("got a MMI") - handled := false - for _, subscriber := range c.subscribers { - if ok := subscriber.handleMonitoredMMI(calReply); ok { - mmiLogger.Debug(). - Stringer("subscriber", subscriber). - Msg("handled") - handled = true - } - } - if !handled { - mmiLogger.Debug().Msg("MMI was not handled") + mmiLogger.Trace().Msg("got a MMI") + handled := false + for _, subscriber := range c.subscribers { + if ok := subscriber.handleMonitoredMMI(calReply); ok { + mmiLogger.Debug(). + Stringer("subscriber", subscriber). + Msg("handled") + handled = true } } + if !handled { + mmiLogger.Debug().Msg("MMI was not handled") + } } - mmiLogger.Info().Msg("handler ended") - }) + } + mmiLogger.Info().Msg("handler ended") } func (c *Connection) sendReset(ctx context.Context) error { @@ -355,7 +359,7 @@ func (c *Connection) sendReset(ctx context.Context) error { receivedResetEchoChan := make(chan bool, 1) receivedResetEchoErrorChan := make(chan error, 1) - if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool { + if err := c.messageCodec.SendRequest(ctx, "send_reset", cBusMessage, func(message spi.Message) bool { c.log.Trace().Msg("Checking message") switch message := message.(type) { case readWriteModel.CBusMessageToClient: @@ -505,7 +509,7 @@ func (c *Connection) sendCalDataWrite(ctx context.Context, paramNo readWriteMode directCommandAckChan := make(chan bool, 1) directCommandAckErrorChan := make(chan error, 1) - if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool { + if err := c.messageCodec.SendRequest(ctx, "send_cal_data_write", cBusMessage, func(message spi.Message) bool { switch message := message.(type) { case readWriteModel.CBusMessageToClient: switch reply := message.GetReply().(type) { diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go index bfe8b17bd2..55783800ff 100644 --- a/plc4go/internal/cbus/MessageCodec.go +++ b/plc4go/internal/cbus/MessageCodec.go @@ -98,8 +98,8 @@ func (m *MessageCodec) Disconnect() error { return err } -func (m *MessageCodec) Send(ctx context.Context, message spi.Message) error { - m.log.Trace().Interface("message", message).Msg("Sending message") +func (m *MessageCodec) Send(ctx context.Context, interactionId string, message spi.Message) error { + m.log.Trace().Str("interactionId", interactionId).Interface("message", message).Msg("Sending message") // Cast the message to the correct type of struct cbusMessage, ok := message.(readWriteModel.CBusMessage) if !ok { diff --git a/plc4go/internal/cbus/MessageCodec_test.go b/plc4go/internal/cbus/MessageCodec_test.go index 4668381006..100c058932 100644 --- a/plc4go/internal/cbus/MessageCodec_test.go +++ b/plc4go/internal/cbus/MessageCodec_test.go @@ -48,7 +48,9 @@ func TestMessageCodec_Send(t *testing.T) { monitoredSALs chan readWriteModel.MonitoredSAL } type args struct { - message spi.Message + ctx context.Context + interactionId string + message spi.Message } tests := []struct { name string @@ -105,7 +107,7 @@ func TestMessageCodec_Send(t *testing.T) { monitoredMMIs: tt.fields.monitoredMMIs, monitoredSALs: tt.fields.monitoredSALs, } - tt.wantErr(t, m.Send(t.Context(), tt.args.message), fmt.Sprintf("Send(%v)", tt.args.message)) + tt.wantErr(t, m.Send(t.Context(), t.Name(), tt.args.message), fmt.Sprintf("Send(%v)", tt.args.message)) }) } } diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go index ae000b785b..0639e33462 100644 --- a/plc4go/internal/cbus/Reader.go +++ b/plc4go/internal/cbus/Reader.go @@ -142,7 +142,7 @@ func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToS func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transactions.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) { // Send the over the wire m.log.Trace().Msg("send over the wire") - if err := m.messageCodec.SendRequest(ctx, messageToSend, func(cbusMessage spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "send_generic_read_message", messageToSend, func(cbusMessage spi.Message) bool { m.log.Trace().Type("cbusMessageType", cbusMessage).Msg("Checking") messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient) if !ok { diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go index 27bbbd737e..fa8b225fb8 100644 --- a/plc4go/internal/cbus/Writer.go +++ b/plc4go/internal/cbus/Writer.go @@ -118,7 +118,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques context.AfterFunc(transactionContext, cancel) // Send the over the wire m.log.Trace().Msg("Send ") - if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "write", messageToSend, func(receivedMessage spi.Message) bool { cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessage) if !ok { return false diff --git a/plc4go/internal/eip/Connection.go b/plc4go/internal/eip/Connection.go index b0b8926b6b..f960aef0ac 100644 --- a/plc4go/internal/eip/Connection.go +++ b/plc4go/internal/eip/Connection.go @@ -157,7 +157,7 @@ func (c *Connection) Close() error { ctx, cancelFunc := context.WithTimeout(ctx, 5*time.Second) defer cancelFunc() c.log.Debug().Msg("Sending UnregisterSession EIP Packet") - if err := c.messageCodec.SendRequest(ctx, readWriteModel.NewEipDisconnectRequest(c.sessionHandle, 0, []byte(DefaultSenderContext), 0), func(message spi.Message) bool { + if err := c.messageCodec.SendRequest(ctx, "close_eip_disconnect_request", readWriteModel.NewEipDisconnectRequest(c.sessionHandle, 0, []byte(DefaultSenderContext), 0), func(message spi.Message) bool { return true }, func(message spi.Message) error { return nil @@ -203,7 +203,7 @@ func (c *Connection) listServiceRequest(ctx context.Context) error { c.log.Debug().Msg("Sending ListServices Request") listServicesResultChan := make(chan readWriteModel.ListServicesResponse, 1) listServicesResultErrorChan := make(chan error, 1) - if err := c.messageCodec.SendRequest(ctx, readWriteModel.NewListServicesRequest( + if err := c.messageCodec.SendRequest(ctx, "list_service_request", readWriteModel.NewListServicesRequest( EmptySessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), @@ -251,7 +251,7 @@ func (c *Connection) connectRegisterSession(ctx context.Context) error { c.log.Debug().Msg("Sending EipConnectionRequest") connectionResponseChan := make(chan readWriteModel.EipConnectionResponse, 1) connectionResponseErrorChan := make(chan error, 1) - if err := c.messageCodec.SendRequest(ctx, readWriteModel.NewEipConnectionRequest( + if err := c.messageCodec.SendRequest(ctx, "connect_register_session", readWriteModel.NewEipConnectionRequest( EmptySessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), @@ -299,7 +299,7 @@ func (c *Connection) connectRegisterSession(ctx context.Context) error { 0, typeIds, ) - if err := c.messageCodec.SendRequest(ctx, eipWrapper, func(message spi.Message) bool { + if err := c.messageCodec.SendRequest(ctx, "todo_what_is_this", eipWrapper, func(message spi.Message) bool { eipPacket := message.(readWriteModel.EipPacket) if eipPacket == nil { return false @@ -364,7 +364,7 @@ func (c *Connection) listAllAttributes(ctx context.Context) error { listAllAttributesErrorChan := make(chan error, 1) classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(uint8(0), uint8(2))) instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewInstanceID(uint8(0), uint8(1))) - if err := c.messageCodec.SendRequest(ctx, readWriteModel.NewCipRRData( + if err := c.messageCodec.SendRequest(ctx, "list_all_attributes", readWriteModel.NewCipRRData( c.sessionHandle, uint32(readWriteModel.CIPStatus_Success), c.senderContext, diff --git a/plc4go/internal/eip/MessageCodec.go b/plc4go/internal/eip/MessageCodec.go index eb7fa8cf16..0ca7459d79 100644 --- a/plc4go/internal/eip/MessageCodec.go +++ b/plc4go/internal/eip/MessageCodec.go @@ -55,8 +55,8 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec { return m } -func (m *MessageCodec) Send(ctx context.Context, message spi.Message) error { - m.log.Trace().Msg("Sending message") +func (m *MessageCodec) Send(ctx context.Context, interactionId string, message spi.Message) error { + m.log.Trace().Str("interactionId", interactionId).Msg("Sending message") // Cast the message to the correct type of struct eipPacket := message.(model.EipPacket) // Serialize the request diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go index 5cc4d6c49d..70f0628e0e 100644 --- a/plc4go/internal/eip/Reader.go +++ b/plc4go/internal/eip/Reader.go @@ -110,7 +110,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) transaction.Submit(func(transactionContext context.Context, transaction transactions.RequestTransaction) { ctx, cancel := context.WithCancel(ctx) context.AfterFunc(transactionContext, cancel) - if err := m.messageCodec.SendRequest(ctx, request, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "read", request, func(message spi.Message) bool { eipPacket := message.(readWriteModel.EipPacket) if eipPacket == nil { return false diff --git a/plc4go/internal/knxnetip/ConnectionInternalOperations.go b/plc4go/internal/knxnetip/ConnectionInternalOperations.go index 7eda53bfba..34f0a872ce 100644 --- a/plc4go/internal/knxnetip/ConnectionInternalOperations.go +++ b/plc4go/internal/knxnetip/ConnectionInternalOperations.go @@ -57,7 +57,7 @@ func (m *Connection) sendGatewaySearchRequest(ctx context.Context) (driverModel. result := make(chan driverModel.SearchResponse, 1) errorResult := make(chan error, 1) - if err = m.messageCodec.SendRequest(ctx, searchRequest, func(message spi.Message) bool { + if err = m.messageCodec.SendRequest(ctx, "gateway_search_request", searchRequest, func(message spi.Message) bool { _, ok := message.(driverModel.SearchResponse) return ok }, func(message spi.Message) error { @@ -102,7 +102,7 @@ func (m *Connection) sendGatewayConnectionRequest(ctx context.Context) (driverMo result := make(chan driverModel.ConnectionResponse, 1) errorResult := make(chan error, 1) - if err = m.messageCodec.SendRequest(ctx, connectionRequest, func(message spi.Message) bool { + if err = m.messageCodec.SendRequest(ctx, "gateway_connection_request", connectionRequest, func(message spi.Message) bool { _, ok := message.(driverModel.ConnectionResponse) return ok }, func(message spi.Message) error { @@ -147,7 +147,7 @@ func (m *Connection) sendGatewayDisconnectionRequest(ctx context.Context) (drive result := make(chan driverModel.DisconnectResponse, 1) errorResult := make(chan error, 1) - if err := m.messageCodec.SendRequest(ctx, disconnectRequest, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "disconnect_request", disconnectRequest, func(message spi.Message) bool { _, ok := message.(driverModel.DisconnectResponse) return ok }, func(message spi.Message) error { @@ -189,7 +189,7 @@ func (m *Connection) sendConnectionStateRequest(ctx context.Context) (driverMode result := make(chan driverModel.ConnectionStateResponse, 1) errorResult := make(chan error, 1) - err = m.messageCodec.SendRequest(ctx, connectionStateRequest, func(message spi.Message) bool { + err = m.messageCodec.SendRequest(ctx, "connection_state_request", connectionStateRequest, func(message spi.Message) bool { _, ok := message.(driverModel.ConnectionStateResponse) return ok }, func(message spi.Message) error { @@ -246,7 +246,7 @@ func (m *Connection) sendGroupAddressReadRequest(ctx context.Context, groupAddre result := make(chan driverModel.ApduDataGroupValueResponse, 1) errorResult := make(chan error, 1) - if err := m.messageCodec.SendRequest(ctx, groupAddressReadRequest, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "send_group_address_read_request", groupAddressReadRequest, func(message spi.Message) bool { tunnelingRequest, ok := message.(driverModel.TunnelingRequest) if !ok || tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId { return false @@ -326,7 +326,7 @@ func (m *Connection) sendDeviceConnectionRequest(ctx context.Context, targetAddr result := make(chan driverModel.ApduControlConnect, 1) errorResult := make(chan error, 1) - err := m.messageCodec.SendRequest(ctx, deviceConnectionRequest, func(message spi.Message) bool { + err := m.messageCodec.SendRequest(ctx, "send_device_connection_request", deviceConnectionRequest, func(message spi.Message) bool { tunnelingRequest, ok := message.(driverModel.TunnelingRequest) if !ok || tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId { return false @@ -415,7 +415,7 @@ func (m *Connection) sendDeviceDisconnectionRequest(ctx context.Context, targetA result := make(chan driverModel.ApduControlDisconnect, 1) errorResult := make(chan error, 1) - if err := m.messageCodec.SendRequest(ctx, deviceDisconnectionRequest, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "send_device_disconnection_request", deviceDisconnectionRequest, func(message spi.Message) bool { tunnelingRequest, ok := message.(driverModel.TunnelingRequest) if !ok || tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId { return false @@ -514,7 +514,7 @@ func (m *Connection) sendDeviceAuthentication(ctx context.Context, targetAddress result := make(chan driverModel.ApduDataExtAuthorizeResponse, 1) errorResult := make(chan error, 1) - if err := m.messageCodec.SendRequest(ctx, deviceAuthenticationRequest, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "send_device_authentication", deviceAuthenticationRequest, func(message spi.Message) bool { tunnelingRequest, ok := message.(driverModel.TunnelingRequest) if !ok || tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId { return false @@ -624,7 +624,7 @@ func (m *Connection) sendDeviceDeviceDescriptorReadRequest(ctx context.Context, result := make(chan driverModel.ApduDataDeviceDescriptorResponse, 1) errorResult := make(chan error, 1) - if err := m.messageCodec.SendRequest(ctx, deviceDescriptorReadRequest, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "send_device_descriptor_read_request", deviceDescriptorReadRequest, func(message spi.Message) bool { tunnelingRequest, ok := message.(driverModel.TunnelingRequest) if !ok || tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId { return false @@ -727,7 +727,7 @@ func (m *Connection) sendDevicePropertyReadRequest(ctx context.Context, targetAd result := make(chan driverModel.ApduDataExtPropertyValueResponse, 1) errorResult := make(chan error, 1) - if err := m.messageCodec.SendRequest(ctx, propertyReadRequest, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "send_device_property_read_request", propertyReadRequest, func(message spi.Message) bool { tunnelingRequest, ok := message.(driverModel.TunnelingRequest) if !ok || tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId { return false @@ -835,70 +835,77 @@ func (m *Connection) sendDevicePropertyDescriptionReadRequest(ctx context.Contex result := make(chan driverModel.ApduDataExtPropertyDescriptionResponse, 1) errorResult := make(chan error, 1) - if err := m.messageCodec.SendRequest(ctx, propertyReadRequest, func(message spi.Message) bool { - tunnelingRequest, ok := message.(driverModel.TunnelingRequest) - if !ok || tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId { - return false - } - lDataInd, ok := tunnelingRequest.GetCemi().(driverModel.LDataInd) - if !ok { - return false - } - dataFrameExt, ok := lDataInd.GetDataFrame().(driverModel.LDataExtended) - if !ok { - return false - } - // Check if the address matches - if dataFrameExt.GetSourceAddress() != targetAddress { - return false - } - // Check if the counter matches - if dataFrameExt.GetApdu().GetCounter() != counter { - return false - } - dataContainer, ok := dataFrameExt.GetApdu().(driverModel.ApduDataContainer) - if !ok { - return false - } - dataApduOther, ok := dataContainer.GetDataApdu().(driverModel.ApduDataOther) - if !ok { - return false - } - propertyDescriptionResponse, ok := dataApduOther.GetExtendedApdu().(driverModel.ApduDataExtPropertyDescriptionResponse) - if !ok { - return false - } - return propertyDescriptionResponse.GetObjectIndex() == objectId && propertyDescriptionResponse.GetPropertyId() == propertyId - }, func(message spi.Message) error { - tunnelingRequest := message.(driverModel.TunnelingRequest) - lDataInd := tunnelingRequest.GetCemi().(driverModel.LDataInd) - dataFrameExt := lDataInd.GetDataFrame().(driverModel.LDataExtended) - dataContainer := dataFrameExt.GetApdu().(driverModel.ApduDataContainer) - dataApduOther := dataContainer.GetDataApdu().(driverModel.ApduDataOther) - propertyDescriptionResponse := dataApduOther.GetExtendedApdu().(driverModel.ApduDataExtPropertyDescriptionResponse) - - // Acknowledge the receipt - _ = m.sendDeviceAck(ctx, targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) { - // If the error flag is set, there was an error authenticating - if lDataInd.GetDataFrame().GetErrorFlag() { - errorResult <- errors.Errorf("error reading property description from device: %s", KnxAddressToString(targetAddress)) - } else if err != nil { - errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress)) - } else { - result <- propertyDescriptionResponse + if err := m.messageCodec.SendRequest( + ctx, + "send_device_property_description_read_request", + propertyReadRequest, + func(message spi.Message) bool { + tunnelingRequest, ok := message.(driverModel.TunnelingRequest) + if !ok || tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId { + return false } - }) - - return nil - }, func(err error) error { - // If this is a timeout, do a check if the connection requires a reconnection - var timeoutError utils.TimeoutError - if errors.As(err, &timeoutError) { - m.handleTimeout() - } - errorResult <- errors.Wrapf(err, "got error processing request") - return nil - }); err != nil { + lDataInd, ok := tunnelingRequest.GetCemi().(driverModel.LDataInd) + if !ok { + return false + } + dataFrameExt, ok := lDataInd.GetDataFrame().(driverModel.LDataExtended) + if !ok { + return false + } + // Check if the address matches + if dataFrameExt.GetSourceAddress() != targetAddress { + return false + } + // Check if the counter matches + if dataFrameExt.GetApdu().GetCounter() != counter { + return false + } + dataContainer, ok := dataFrameExt.GetApdu().(driverModel.ApduDataContainer) + if !ok { + return false + } + dataApduOther, ok := dataContainer.GetDataApdu().(driverModel.ApduDataOther) + if !ok { + return false + } + propertyDescriptionResponse, ok := dataApduOther.GetExtendedApdu().(driverModel.ApduDataExtPropertyDescriptionResponse) + if !ok { + return false + } + return propertyDescriptionResponse.GetObjectIndex() == objectId && propertyDescriptionResponse.GetPropertyId() == propertyId + }, + func(message spi.Message) error { + tunnelingRequest := message.(driverModel.TunnelingRequest) + lDataInd := tunnelingRequest.GetCemi().(driverModel.LDataInd) + dataFrameExt := lDataInd.GetDataFrame().(driverModel.LDataExtended) + dataContainer := dataFrameExt.GetApdu().(driverModel.ApduDataContainer) + dataApduOther := dataContainer.GetDataApdu().(driverModel.ApduDataOther) + propertyDescriptionResponse := dataApduOther.GetExtendedApdu().(driverModel.ApduDataExtPropertyDescriptionResponse) + + // Acknowledge the receipt + _ = m.sendDeviceAck(ctx, targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) { + // If the error flag is set, there was an error authenticating + if lDataInd.GetDataFrame().GetErrorFlag() { + errorResult <- errors.Errorf("error reading property description from device: %s", KnxAddressToString(targetAddress)) + } else if err != nil { + errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress)) + } else { + result <- propertyDescriptionResponse + } + }) + + return nil + }, + func(err error) error { + // If this is a timeout, do a check if the connection requires a reconnection + var timeoutError utils.TimeoutError + if errors.As(err, &timeoutError) { + m.handleTimeout() + } + errorResult <- errors.Wrapf(err, "got error processing request") + return nil + }, + ); err != nil { return nil, errors.Wrap(err, "got error sending property description read request") } @@ -942,67 +949,74 @@ func (m *Connection) sendDeviceMemoryReadRequest(ctx context.Context, targetAddr result := make(chan driverModel.ApduDataMemoryResponse, 1) errorResult := make(chan error, 1) - if err := m.messageCodec.SendRequest(ctx, propertyReadRequest, func(message spi.Message) bool { - tunnelingRequest, ok := message.(driverModel.TunnelingRequest) - if !ok || - tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId { - return false - } - lDataInd, ok := tunnelingRequest.GetCemi().(driverModel.LDataInd) - if !ok { - return false - } - dataFrameExt, ok := lDataInd.GetDataFrame().(driverModel.LDataExtended) - if !ok { - return false - } - dataContainer, ok := dataFrameExt.GetApdu().(driverModel.ApduDataContainer) - if !ok { - return false - } - dataApduMemoryResponse, ok := dataContainer.GetDataApdu().(driverModel.ApduDataMemoryResponse) - if !ok { - return false - } - - // Check if the address matches - if dataFrameExt.GetSourceAddress() != targetAddress { - return false - } - // Check if the counter matches - if dataFrameExt.GetApdu().GetCounter() != counter { - return false - } - return dataApduMemoryResponse.GetAddress() == address - }, func(message spi.Message) error { - tunnelingRequest := message.(driverModel.TunnelingRequest) - lDataInd := tunnelingRequest.GetCemi().(driverModel.LDataInd) - dataFrameExt := lDataInd.GetDataFrame().(driverModel.LDataExtended) - dataContainer := dataFrameExt.GetApdu().(driverModel.ApduDataContainer) - dataApduMemoryResponse := dataContainer.GetDataApdu().(driverModel.ApduDataMemoryResponse) - - // Acknowledge the receipt - _ = m.sendDeviceAck(ctx, targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) { - // If the error flag is set, there was an error authenticating - if lDataInd.GetDataFrame().GetErrorFlag() { - errorResult <- errors.Errorf("error reading memory from device: %s", KnxAddressToString(targetAddress)) - } else if err != nil { - errorResult <- errors.Errorf("error sending ack to device: %s", KnxAddressToString(targetAddress)) - } else { - result <- dataApduMemoryResponse + if err := m.messageCodec.SendRequest( + ctx, + "send_device_memory_read_request", + propertyReadRequest, + func(message spi.Message) bool { + tunnelingRequest, ok := message.(driverModel.TunnelingRequest) + if !ok || + tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId { + return false + } + lDataInd, ok := tunnelingRequest.GetCemi().(driverModel.LDataInd) + if !ok { + return false + } + dataFrameExt, ok := lDataInd.GetDataFrame().(driverModel.LDataExtended) + if !ok { + return false + } + dataContainer, ok := dataFrameExt.GetApdu().(driverModel.ApduDataContainer) + if !ok { + return false + } + dataApduMemoryResponse, ok := dataContainer.GetDataApdu().(driverModel.ApduDataMemoryResponse) + if !ok { + return false } - }) - return nil - }, func(err error) error { - // If this is a timeout, do a check if the connection requires a reconnection - var timeoutError utils.TimeoutError - if errors.As(err, &timeoutError) { - m.handleTimeout() - } - errorResult <- errors.Wrap(err, "got error processing request") - return nil - }); err != nil { + // Check if the address matches + if dataFrameExt.GetSourceAddress() != targetAddress { + return false + } + // Check if the counter matches + if dataFrameExt.GetApdu().GetCounter() != counter { + return false + } + return dataApduMemoryResponse.GetAddress() == address + }, + func(message spi.Message) error { + tunnelingRequest := message.(driverModel.TunnelingRequest) + lDataInd := tunnelingRequest.GetCemi().(driverModel.LDataInd) + dataFrameExt := lDataInd.GetDataFrame().(driverModel.LDataExtended) + dataContainer := dataFrameExt.GetApdu().(driverModel.ApduDataContainer) + dataApduMemoryResponse := dataContainer.GetDataApdu().(driverModel.ApduDataMemoryResponse) + + // Acknowledge the receipt + _ = m.sendDeviceAck(ctx, targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) { + // If the error flag is set, there was an error authenticating + if lDataInd.GetDataFrame().GetErrorFlag() { + errorResult <- errors.Errorf("error reading memory from device: %s", KnxAddressToString(targetAddress)) + } else if err != nil { + errorResult <- errors.Errorf("error sending ack to device: %s", KnxAddressToString(targetAddress)) + } else { + result <- dataApduMemoryResponse + } + }) + + return nil + }, + func(err error) error { + // If this is a timeout, do a check if the connection requires a reconnection + var timeoutError utils.TimeoutError + if errors.As(err, &timeoutError) { + m.handleTimeout() + } + errorResult <- errors.Wrap(err, "got error processing request") + return nil + }, + ); err != nil { return nil, errors.Wrap(err, "got error sending memory read request") } @@ -1035,51 +1049,58 @@ func (m *Connection) sendDeviceAck(ctx context.Context, targetAddress driverMode ), ) - if err := m.messageCodec.SendRequest(ctx, ack, func(message spi.Message) bool { - tunnelingRequest, ok := message.(driverModel.TunnelingRequest) - if !ok || - tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId { - return false - } - lDataCon, ok := tunnelingRequest.GetCemi().(driverModel.LDataCon) - if !ok { - return false - } - dataFrameExt, ok := lDataCon.GetDataFrame().(driverModel.LDataExtended) - if !ok { - return false - } - // Check if the addresses match - if dataFrameExt.GetSourceAddress() != m.ClientKnxAddress { - return false - } - ctxForModel := options.GetLoggerContextForModel(ctx, m.log, options.WithPassLoggerToModel(m.passLogToModel)) - curTargetAddress := ByteArrayToKnxAddress(ctxForModel, dataFrameExt.GetDestinationAddress()) - if curTargetAddress != targetAddress { - return false - } - // Check if the counter matches - if dataFrameExt.GetApdu().GetCounter() != counter { - return false - } - controlContainer, ok := dataFrameExt.GetApdu().(driverModel.ApduControlContainer) - if !ok { - return false - } - _, ok = controlContainer.GetControlApdu().(driverModel.ApduControlAck) - return ok - }, func(message spi.Message) error { - callback(nil) - return nil - }, func(err error) error { - // If this is a timeout, do a check if the connection requires a reconnection - var timeoutError utils.TimeoutError - if errors.As(err, &timeoutError) { - m.handleTimeout() - } - callback(errors.Wrap(err, "got error processing request")) - return nil - }); err != nil { + if err := m.messageCodec.SendRequest( + ctx, + "send_device_ack", + ack, + func(message spi.Message) bool { + tunnelingRequest, ok := message.(driverModel.TunnelingRequest) + if !ok || + tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId { + return false + } + lDataCon, ok := tunnelingRequest.GetCemi().(driverModel.LDataCon) + if !ok { + return false + } + dataFrameExt, ok := lDataCon.GetDataFrame().(driverModel.LDataExtended) + if !ok { + return false + } + // Check if the addresses match + if dataFrameExt.GetSourceAddress() != m.ClientKnxAddress { + return false + } + ctxForModel := options.GetLoggerContextForModel(ctx, m.log, options.WithPassLoggerToModel(m.passLogToModel)) + curTargetAddress := ByteArrayToKnxAddress(ctxForModel, dataFrameExt.GetDestinationAddress()) + if curTargetAddress != targetAddress { + return false + } + // Check if the counter matches + if dataFrameExt.GetApdu().GetCounter() != counter { + return false + } + controlContainer, ok := dataFrameExt.GetApdu().(driverModel.ApduControlContainer) + if !ok { + return false + } + _, ok = controlContainer.GetControlApdu().(driverModel.ApduControlAck) + return ok + }, + func(message spi.Message) error { + callback(nil) + return nil + }, + func(err error) error { + // If this is a timeout, do a check if the connection requires a reconnection + var timeoutError utils.TimeoutError + if errors.As(err, &timeoutError) { + m.handleTimeout() + } + callback(errors.Wrap(err, "got error processing request")) + return nil + }, + ); err != nil { return errors.Wrap(err, "got error sending ack request") } diff --git a/plc4go/internal/knxnetip/Discoverer.go b/plc4go/internal/knxnetip/Discoverer.go index d63501e901..af96d746b7 100644 --- a/plc4go/internal/knxnetip/Discoverer.go +++ b/plc4go/internal/knxnetip/Discoverer.go @@ -232,7 +232,7 @@ func (d *Discoverer) createDeviceScanDispatcher(ctx context.Context, udpTranspor driverModel.HostProtocolCode_IPV4_UDP, localAddr, uint16(localAddress.Port)) searchRequestMessage := driverModel.NewSearchRequest(discoveryEndpoint) // Send the search request. - if err := codec.Send(ctx, searchRequestMessage); err != nil { + if err := codec.Send(ctx, "device_scan_search_request", searchRequestMessage); err != nil { d.log.Debug().Err(err).Interface("searchRequestMessage", searchRequestMessage).Msg("Error sending message") return } diff --git a/plc4go/internal/knxnetip/MessageCodec.go b/plc4go/internal/knxnetip/MessageCodec.go index 57d609ed46..c7778606ac 100644 --- a/plc4go/internal/knxnetip/MessageCodec.go +++ b/plc4go/internal/knxnetip/MessageCodec.go @@ -62,8 +62,8 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec { return m } -func (m *MessageCodec) Send(ctx context.Context, message spi.Message) error { - m.log.Trace().Msg("Sending message") +func (m *MessageCodec) Send(ctx context.Context, interactionId string, message spi.Message) error { + m.log.Trace().Str("interactionId", interactionId).Msg("Sending message") // Cast the message to the correct type of struct knxMessage := message.(model.KnxNetIpMessage) // Serialize the request @@ -134,7 +134,7 @@ func CustomMessageHandling(localLog zerolog.Logger) _default.CustomMessageHandle tunnelingRequest.GetTunnelingRequestDataBlock().GetSequenceCounter(), model.Status_NO_ERROR), ) - err := codec.Send(ctx, response) // TODO: where is a good place to get this timeout from? + err := codec.Send(ctx, "tunneling_request", response) // TODO: where is a good place to get this timeout from? if err != nil { localLog.Warn().Err(err).Msg("got an error sending ACK from transport") } diff --git a/plc4go/internal/modbus/Connection.go b/plc4go/internal/modbus/Connection.go index 329d4c140f..2deb222122 100644 --- a/plc4go/internal/modbus/Connection.go +++ b/plc4go/internal/modbus/Connection.go @@ -110,7 +110,7 @@ func (c *Connection) Ping(ctx context.Context) error { successChan := make(chan struct{}, 1) diagnosticRequestPdu := readWriteModel.NewModbusPDUDiagnosticRequest(0, 0x42) pingRequest := readWriteModel.NewModbusTcpADU(1, c.unitIdentifier, diagnosticRequestPdu) - if err := c.messageCodec.SendRequest(ctx, pingRequest, func(message spi.Message) bool { + if err := c.messageCodec.SendRequest(ctx, "ping", pingRequest, func(message spi.Message) bool { responseAdu, ok := message.(readWriteModel.ModbusTcpADU) if !ok { return false diff --git a/plc4go/internal/modbus/MessageCodec.go b/plc4go/internal/modbus/MessageCodec.go index ded3d67aad..8ccdebb235 100644 --- a/plc4go/internal/modbus/MessageCodec.go +++ b/plc4go/internal/modbus/MessageCodec.go @@ -58,8 +58,8 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec { return m } -func (m *MessageCodec) Send(ctx context.Context, message spi.Message) error { - m.log.Trace().Msg("Sending message") +func (m *MessageCodec) Send(ctx context.Context, interactionId string, message spi.Message) error { + m.log.Trace().Str("interactionId", interactionId).Msg("Sending message") // Cast the message to the correct type of struct tcpAdu := message.(model.ModbusTcpADU) // Serialize the request diff --git a/plc4go/internal/modbus/Reader.go b/plc4go/internal/modbus/Reader.go index dcd13c85e0..e34066cf52 100644 --- a/plc4go/internal/modbus/Reader.go +++ b/plc4go/internal/modbus/Reader.go @@ -131,7 +131,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) // Send the ADU over the wire m.log.Trace().Msg("Send ADU") - if err = m.messageCodec.SendRequest(ctx, requestAdu, func(message spi.Message) bool { + if err = m.messageCodec.SendRequest(ctx, "read", requestAdu, func(message spi.Message) bool { responseAdu := message.(readWriteModel.ModbusTcpADU) return responseAdu.GetTransactionIdentifier() == uint16(transactionIdentifier) && responseAdu.GetUnitIdentifier() == requestAdu.UnitIdentifier diff --git a/plc4go/internal/modbus/Writer.go b/plc4go/internal/modbus/Writer.go index 75e336cbce..e669a90947 100644 --- a/plc4go/internal/modbus/Writer.go +++ b/plc4go/internal/modbus/Writer.go @@ -120,7 +120,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques requestAdu := readWriteModel.NewModbusTcpADU(uint16(transactionIdentifier), m.unitIdentifier, pdu) // Send the ADU over the wire - if err = m.messageCodec.SendRequest(ctx, requestAdu, func(message spi.Message) bool { + if err = m.messageCodec.SendRequest(ctx, "write", requestAdu, func(message spi.Message) bool { responseAdu := message.(readWriteModel.ModbusTcpADU) return responseAdu.GetTransactionIdentifier() == uint16(transactionIdentifier) && responseAdu.GetUnitIdentifier() == requestAdu.UnitIdentifier diff --git a/plc4go/internal/opcua/MessageCodec.go b/plc4go/internal/opcua/MessageCodec.go index 49d54f9e4c..596014abd5 100644 --- a/plc4go/internal/opcua/MessageCodec.go +++ b/plc4go/internal/opcua/MessageCodec.go @@ -60,8 +60,8 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec { return m } -func (m *MessageCodec) Send(ctx context.Context, message spi.Message) error { - m.log.Trace().Interface("message", message).Msg("Sending message") +func (m *MessageCodec) Send(ctx context.Context, interactionId string, message spi.Message) error { + m.log.Trace().Str("interactionId", interactionId).Interface("message", message).Msg("Sending message") // Cast the message to the correct type of struct opcuaApu, ok := message.(readWriteModel.OpcuaAPU) if !ok { diff --git a/plc4go/internal/opcua/SecureChannel.go b/plc4go/internal/opcua/SecureChannel.go index ae2be89df2..cf78f22bd0 100644 --- a/plc4go/internal/opcua/SecureChannel.go +++ b/plc4go/internal/opcua/SecureChannel.go @@ -214,7 +214,7 @@ func (s *SecureChannel) submit(ctx context.Context, codec *MessageCodec, errorDi requestConsumer := func(transactionId int32) { var messageBuffer []byte - if err := codec.SendRequest(ctx, apu, func(message spi.Message) bool { + if err := codec.SendRequest(ctx, "submis", apu, func(message spi.Message) bool { opcuaAPU, ok := message.(readWriteModel.OpcuaAPU) if !ok { s.log.Debug().Type("type", message).Msg("Not relevant") @@ -298,7 +298,7 @@ func (s *SecureChannel) onConnect(ctx context.Context, connection *Connection) e errChan := make(chan error, 1) requestConsumer := func(transactionId int32) { s.log.Trace().Int32("transactionId", transactionId).Msg("request consumer called") - if err := s.codec.SendRequest(ctx, hello, func(message spi.Message) bool { + if err := s.codec.SendRequest(ctx, "hello", hello, func(message spi.Message) bool { opcuaAPU, ok := message.(readWriteModel.OpcuaAPU) if !ok { s.log.Debug().Type("type", message).Msg("Not relevant") @@ -425,7 +425,7 @@ func (s *SecureChannel) onConnectOpenSecureChannel(ctx context.Context, connecti } requestConsumer := func(transactionId int32) { - if err := s.codec.SendRequest(ctx, apu, func(message spi.Message) bool { + if err := s.codec.SendRequest(ctx, "open_secure_channel", apu, func(message spi.Message) bool { opcuaAPU, ok := message.(readWriteModel.OpcuaAPU) if !ok { s.log.Debug().Type("type", message).Msg("Not relevant") @@ -834,7 +834,7 @@ func (s *SecureChannel) onDisconnectCloseSecureChannel(ctx context.Context, conn apu := readWriteModel.NewOpcuaAPU(closeRequest) requestConsumer := func(transactionId int32) { - if err := connection.messageCodec.SendRequest(ctx, apu, func(message spi.Message) bool { + if err := connection.messageCodec.SendRequest(ctx, "disconnect_secure_channel", apu, func(message spi.Message) bool { opcuaAPU, ok := message.(readWriteModel.OpcuaAPU) if !ok { s.log.Debug().Type("type", message).Msg("Not relevant") @@ -886,7 +886,7 @@ func (s *SecureChannel) onDiscover(ctx context.Context, codec *MessageCodec) { apu := readWriteModel.NewOpcuaAPU(hello) requestConsumer := func(transactionId int32) { - if err := codec.SendRequest(ctx, apu, func(message spi.Message) bool { + if err := codec.SendRequest(ctx, "on_discover_hello", apu, func(message spi.Message) bool { opcuaAPU, ok := message.(readWriteModel.OpcuaAPU) if !ok { s.log.Debug().Type("type", message).Msg("Not relevant") @@ -977,7 +977,7 @@ func (s *SecureChannel) onDiscoverOpenSecureChannel(ctx context.Context, codec * apu := readWriteModel.NewOpcuaAPU(openRequest) requestConsumer := func(transactionId int32) { - if err := codec.SendRequest(ctx, apu, func(message spi.Message) bool { + if err := codec.SendRequest(ctx, "on_discover_open_secure_channes", apu, func(message spi.Message) bool { opcuaAPU, ok := message.(readWriteModel.OpcuaAPU) if !ok { s.log.Debug().Type("type", message).Msg("Not relevant") @@ -1093,7 +1093,7 @@ func (s *SecureChannel) onDiscoverGetEndpointsRequest(ctx context.Context, codec apu := readWriteModel.NewOpcuaAPU(messageRequest) requestConsumer := func(transactionId int32) { - if err := codec.SendRequest(ctx, apu, func(message spi.Message) bool { + if err := codec.SendRequest(ctx, "get_endpoints_request", apu, func(message spi.Message) bool { opcuaAPU, ok := message.(readWriteModel.OpcuaAPU) if !ok { s.log.Debug().Type("type", message).Msg("Not relevant") @@ -1196,7 +1196,7 @@ func (s *SecureChannel) onDiscoverCloseSecureChannel(ctx context.Context, codec apu := readWriteModel.NewOpcuaAPU(closeRequest) requestConsumer := func(transactionId int32) { - if err := codec.SendRequest(ctx, apu, func(message spi.Message) bool { + if err := codec.SendRequest(ctx, "on_discover_close_secure_channel", apu, func(message spi.Message) bool { opcuaAPU, ok := message.(readWriteModel.OpcuaAPU) if !ok { s.log.Debug().Type("type", message).Msg("Not relevant") @@ -1324,7 +1324,7 @@ func (s *SecureChannel) keepAlive() { } requestConsumer := func(transactionId int32) { - if err := s.codec.SendRequest(ctx, apu, func(message spi.Message) bool { + if err := s.codec.SendRequest(ctx, "keep_alive", apu, func(message spi.Message) bool { opcuaAPU, ok := message.(readWriteModel.OpcuaAPU) if !ok { s.log.Debug().Type("type", message).Msg("Not relevant") diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go index 7b58b3f0fc..69e4a9d638 100644 --- a/plc4go/internal/s7/Connection.go +++ b/plc4go/internal/s7/Connection.go @@ -158,7 +158,7 @@ func (c *Connection) setupConnection(ctx context.Context) error { // Open the session on ISO Transport Protocol first. cotpConnectionResult := make(chan readWriteModel.COTPPacketConnectionResponse, 1) cotpConnectionErrorChan := make(chan error, 1) - if err := c.messageCodec.SendRequest(ctx, readWriteModel.NewTPKTPacket(c.createCOTPConnectionRequest()), func(message spi.Message) bool { + if err := c.messageCodec.SendRequest(ctx, "setup_connection", readWriteModel.NewTPKTPacket(c.createCOTPConnectionRequest()), func(message spi.Message) bool { tpktPacket := message.(readWriteModel.TPKTPacket) if tpktPacket == nil { return false @@ -190,7 +190,7 @@ func (c *Connection) setupConnection(ctx context.Context) error { // Send an S7 login message. s7ConnectionResult := make(chan readWriteModel.S7ParameterSetupCommunication, 1) s7ConnectionErrorChan := make(chan error, 1) - if err := c.messageCodec.SendRequest(ctx, c.createS7ConnectionRequest(cotpPacketConnectionResponse), func(message spi.Message) bool { + if err := c.messageCodec.SendRequest(ctx, "setup_connection_connection_request", c.createS7ConnectionRequest(cotpPacketConnectionResponse), func(message spi.Message) bool { tpktPacket, ok := message.(readWriteModel.TPKTPacket) if !ok { return false @@ -254,7 +254,7 @@ func (c *Connection) setupConnection(ctx context.Context) error { c.log.Debug().Msg("Sending S7 Identification Request") s7IdentificationResult := make(chan readWriteModel.S7PayloadUserData, 1) s7IdentificationErrorChan := make(chan error, 1) - if err := c.messageCodec.SendRequest(ctx, c.createIdentifyRemoteMessage(), func(message spi.Message) bool { + if err := c.messageCodec.SendRequest(ctx, "setup_connection_identify_remote_message", c.createIdentifyRemoteMessage(), func(message spi.Message) bool { tpktPacket, ok := message.(readWriteModel.TPKTPacket) if !ok { return false diff --git a/plc4go/internal/s7/MessageCodec.go b/plc4go/internal/s7/MessageCodec.go index fa2b1edc64..67a5b28974 100644 --- a/plc4go/internal/s7/MessageCodec.go +++ b/plc4go/internal/s7/MessageCodec.go @@ -55,8 +55,8 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec { return m } -func (m *MessageCodec) Send(ctx context.Context, message spi.Message) error { - m.log.Trace().Msg("Sending message") +func (m *MessageCodec) Send(ctx context.Context, interactionId string, message spi.Message) error { + m.log.Trace().Str("interactionId", interactionId).Msg("Sending message") // Cast the message to the correct type of struct tpktPacket := message.(model.TPKTPacket) // Serialize the request diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go index 7463f45f56..f9f3943e77 100644 --- a/plc4go/internal/s7/Reader.go +++ b/plc4go/internal/s7/Reader.go @@ -120,7 +120,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) // Send the over the wire m.log.Trace().Msg("Send ") - if err := m.messageCodec.SendRequest(ctx, tpktPacket, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "read", tpktPacket, func(message spi.Message) bool { tpktPacket, ok := message.(readWriteModel.TPKTPacket) if !ok { return false diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go index ef14b3acd4..be58225801 100644 --- a/plc4go/internal/s7/Writer.go +++ b/plc4go/internal/s7/Writer.go @@ -111,7 +111,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques ctx, cancel := context.WithCancel(ctx) context.AfterFunc(transactionContext, cancel) // Send the over the wire - if err := m.messageCodec.SendRequest(ctx, tpktPacket, func(message spi.Message) bool { + if err := m.messageCodec.SendRequest(ctx, "write", tpktPacket, func(message spi.Message) bool { tpktPacket, ok := message.(readWriteModel.TPKTPacket) if !ok { return false diff --git a/plc4go/spi/MessageCodec.go b/plc4go/spi/MessageCodec.go index 279e75a048..fcc0eadb7e 100644 --- a/plc4go/spi/MessageCodec.go +++ b/plc4go/spi/MessageCodec.go @@ -55,12 +55,12 @@ type MessageCodec interface { IsRunning() bool // Send is sending a given message - Send(ctx context.Context, message Message) error + Send(ctx context.Context, interactionId string, message Message) error // Expect Wait for a given timespan (defined by ctx or defaulting to default receive timeout) for a message to come // in, which returns 'true' for 'acceptMessage' and is then forwarded to the 'handleMessage' function - Expect(ctx context.Context, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError) + Expect(ctx context.Context, interactionId string, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError) // SendRequest A combination that sends a message first and then waits for a response. !!!Important note: the callbacks are blocking calls - SendRequest(ctx context.Context, message Message, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError) error + SendRequest(ctx context.Context, interactionId string, message Message, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError) error // GetDefaultIncomingMessageChannel gives back the chan where unexpected messages arrive GetDefaultIncomingMessageChannel() chan Message diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go index 30b0bf9e4e..a80bf6a305 100644 --- a/plc4go/spi/default/DefaultCodec.go +++ b/plc4go/spi/default/DefaultCodec.go @@ -41,7 +41,7 @@ import ( // DefaultCodecRequirements adds required methods to MessageCodec that are needed when using DefaultCodec type DefaultCodecRequirements interface { GetCodec() spi.MessageCodec - Send(ctx context.Context, message spi.Message) error + Send(ctx context.Context, interactionId string, message spi.Message) error Receive(ctx context.Context) (spi.Message, error) } @@ -206,16 +206,16 @@ func (m *defaultCodec) IsRunning() bool { return m.running.Load() } -func (m *defaultCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) { +func (m *defaultCodec) Expect(ctx context.Context, interactionId string, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) { m.expectationsChangeMutex.Lock() defer m.expectationsChangeMutex.Unlock() ttl := m.receiveTimeout if deadline, ok := ctx.Deadline(); ok { ttl = time.Until(deadline) } - expectation := newDefaultExpectation(ctx, ttl, acceptsMessage, handleMessage, handleError) + expectation := newDefaultExpectation(ctx, interactionId, ttl, acceptsMessage, handleMessage, handleError) m.expectations = append(m.expectations, expectation) - m.log.Debug().Interface("expectation", expectation).Msg("Added expectation") + m.log.Debug().Str("interactionId", interactionId).Stringer("expectation", expectation).Msg("Added expectation") select { case m.notifyExpireWorker <- struct{}{}: default: @@ -226,13 +226,13 @@ func (m *defaultCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMes } } -func (m *defaultCodec) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) error { +func (m *defaultCodec) SendRequest(ctx context.Context, interactionId string, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) error { if err := ctx.Err(); err != nil { return errors.Wrap(err, "Not sending message as context is aborted") } - m.Expect(ctx, acceptsMessage, handleMessage, handleError) // We register the expectation first to avoid getting a response between sending and adding the expect - m.log.Trace().Msg("Sending request") - return m.Send(ctx, message) + m.Expect(ctx, interactionId, acceptsMessage, handleMessage, handleError) // We register the expectation first to avoid getting a response between sending and adding the expect + m.log.Trace().Str("interactionId", interactionId).Msg("Sending request") + return m.Send(ctx, interactionId, message) } func (m *defaultCodec) TimeoutExpectations(now time.Time) time.Duration { @@ -281,12 +281,12 @@ func (m *defaultCodec) HandleMessages(message spi.Message) bool { messageHandled := false m.log.Trace().Int("nExpectations", len(m.expectations)).Msg("Current number of expectations") m.expectations = slices.DeleteFunc(m.expectations, func(expectation spi.Expectation) bool { - expectationLog := m.log.With().Interface("expectation", expectation).Logger() + expectationLog := m.log.With().Stringer("expectation", expectation).Logger() expectationLog.Trace().Msg("Checking expectation") // Check if the current message matches the expectations // If it does, let it handle the message. if accepts := expectation.GetAcceptsMessage()(message); accepts { - expectationLog.Debug().Msg("accepts message") + expectationLog.Trace().Interface("handleMessage", message).Msg("accepts message") // TODO: decouple from worker thread if err := expectation.GetHandleMessage()(message); err != nil { expectationLog.Debug().Err(err).Msg("errored handling the message") @@ -302,7 +302,7 @@ func (m *defaultCodec) HandleMessages(message spi.Message) bool { messageHandled = true return true } else { - expectationLog.Trace().Msg("doesn't accept message") + expectationLog.Trace().Interface("handleMessage", message).Msg("doesn't accept message") return false } }) diff --git a/plc4go/spi/default/DefaultCodec_test.go b/plc4go/spi/default/DefaultCodec_test.go index 79fa9f826d..2918ba97c5 100644 --- a/plc4go/spi/default/DefaultCodec_test.go +++ b/plc4go/spi/default/DefaultCodec_test.go @@ -498,6 +498,7 @@ func Test_defaultCodec_Expect(t *testing.T) { } type args struct { ctx context.Context + interactionId string acceptsMessage spi.AcceptsMessage handleMessage spi.HandleMessage handleError spi.HandleError @@ -513,6 +514,7 @@ func Test_defaultCodec_Expect(t *testing.T) { name: "expect it", setup: func(t *testing.T, fields *fields, args *args) { args.ctx = testutils.TestContext(t) + args.interactionId = t.Name() var cancelFunc context.CancelFunc args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) @@ -534,7 +536,7 @@ func Test_defaultCodec_Expect(t *testing.T) { customMessageHandling: tt.fields.customMessageHandling, log: testutils.ProduceTestingLogger(t), } - m.Expect(tt.args.ctx, tt.args.acceptsMessage, tt.args.handleMessage, tt.args.handleError) + m.Expect(tt.args.ctx, tt.args.interactionId, tt.args.acceptsMessage, tt.args.handleMessage, tt.args.handleError) }) } } @@ -916,6 +918,7 @@ func Test_defaultCodec_SendRequest(t *testing.T) { } type args struct { ctx context.Context + interactionId string message spi.Message acceptsMessage spi.AcceptsMessage handleMessage spi.HandleMessage @@ -933,10 +936,11 @@ func Test_defaultCodec_SendRequest(t *testing.T) { name: "send it", setup: func(t *testing.T, fields *fields, args *args) { requirements := NewMockDefaultCodecRequirements(t) - requirements.EXPECT().Send(mock.Anything, mock.Anything).Return(nil) + requirements.EXPECT().Send(mock.Anything, mock.Anything, mock.Anything).Return(nil) fields.DefaultCodecRequirements = requirements args.ctx = testutils.TestContext(t) + args.interactionId = t.Name() var cancelFunc context.CancelFunc args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) @@ -951,6 +955,7 @@ func Test_defaultCodec_SendRequest(t *testing.T) { ctx, cancelFunc := context.WithCancel(testutils.TestContext(t)) cancelFunc() args.ctx = ctx + args.interactionId = t.Name() }, wantErr: assert.Error, }, @@ -958,10 +963,11 @@ func Test_defaultCodec_SendRequest(t *testing.T) { name: "send it errors", setup: func(t *testing.T, fields *fields, args *args) { requirements := NewMockDefaultCodecRequirements(t) - requirements.EXPECT().Send(mock.Anything, mock.Anything).Return(errors.New("nope")) + requirements.EXPECT().Send(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("nope")) fields.DefaultCodecRequirements = requirements args.ctx = testutils.TestContext(t) + args.interactionId = t.Name() var cancelFunc context.CancelFunc args.ctx, cancelFunc = context.WithTimeout(args.ctx, 20*time.Second) t.Cleanup(cancelFunc) @@ -984,7 +990,7 @@ func Test_defaultCodec_SendRequest(t *testing.T) { customMessageHandling: tt.fields.customMessageHandling, log: testutils.ProduceTestingLogger(t), } - tt.wantErr(t, m.SendRequest(tt.args.ctx, tt.args.message, tt.args.acceptsMessage, tt.args.handleMessage, tt.args.handleError), fmt.Sprintf("SendRequest(%v, %v, func(), func(), func(), %v)", tt.args.ctx, tt.args.message, tt.args.ttl)) + tt.wantErr(t, m.SendRequest(tt.args.ctx, tt.args.interactionId, tt.args.message, tt.args.acceptsMessage, tt.args.handleMessage, tt.args.handleError), fmt.Sprintf("SendRequest(%v, %v, func(), func(), func(), %v)", tt.args.ctx, tt.args.message, tt.args.ttl)) }) } } @@ -1622,7 +1628,7 @@ func Test_defaultCodec_integration(t *testing.T) { }) // First expect var firstHandled bool - sut.Expect(t.Context(), func(message spi.Message) bool { + sut.Expect(t.Context(), "first", func(message spi.Message) bool { t.Log("accepts message", message) return true }, func(message spi.Message) error { @@ -1635,7 +1641,7 @@ func Test_defaultCodec_integration(t *testing.T) { }) // Second expect var secondHandled bool - sut.Expect(t.Context(), func(message spi.Message) bool { + sut.Expect(t.Context(), "second", func(message spi.Message) bool { t.Log("accepts message", message) return true }, func(message spi.Message) error { @@ -1648,7 +1654,7 @@ func Test_defaultCodec_integration(t *testing.T) { }) // Third expect var thridErrorCalled bool - sut.Expect(t.Context(), func(message spi.Message) bool { + sut.Expect(t.Context(), "third", func(message spi.Message) bool { t.Log("does not accept message", message) return false }, func(message spi.Message) error { @@ -1660,7 +1666,7 @@ func Test_defaultCodec_integration(t *testing.T) { }) // Fourth expect var fourthHandled bool - sut.Expect(t.Context(), func(message spi.Message) bool { + sut.Expect(t.Context(), "fourth", func(message spi.Message) bool { t.Log("accepts message", message) return true }, func(message spi.Message) error { diff --git a/plc4go/spi/default/defaultExpectation.go b/plc4go/spi/default/defaultExpectation.go index db9af30d17..627c9589dd 100644 --- a/plc4go/spi/default/defaultExpectation.go +++ b/plc4go/spi/default/defaultExpectation.go @@ -31,6 +31,7 @@ import ( type defaultExpectation struct { Uuid uuid.UUID + InteractionId string Ctx context.Context CancelFunc context.CancelCauseFunc CreationTime time.Time @@ -40,10 +41,11 @@ type defaultExpectation struct { HandleError spi.HandleError } -func newDefaultExpectation(ctx context.Context, ttl time.Duration, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) *defaultExpectation { +func newDefaultExpectation(ctx context.Context, interactionId string, ttl time.Duration, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) *defaultExpectation { ctx, cancelFunc := context.WithCancelCause(ctx) return &defaultExpectation{ Uuid: uuid.New(), + InteractionId: interactionId, Ctx: ctx, CancelFunc: cancelFunc, CreationTime: time.Now(), @@ -83,5 +85,5 @@ func (d *defaultExpectation) GetHandleError() spi.HandleError { } func (d *defaultExpectation) String() string { - return fmt.Sprintf("Expectation %s (expires at %v in %s)", d.Uuid, d.Expiration, time.Until(d.Expiration)) + return fmt.Sprintf("Expectation '%s' %s (expires at %v in %s)", d.InteractionId, d.Uuid, d.Expiration, time.Until(d.Expiration)) } diff --git a/plc4go/spi/default/mocks_test.go b/plc4go/spi/default/mocks_test.go index 2373d14683..51af0b4f34 100644 --- a/plc4go/spi/default/mocks_test.go +++ b/plc4go/spi/default/mocks_test.go @@ -431,16 +431,16 @@ func (_c *MockDefaultCodecRequirements_Receive_Call) RunAndReturn(run func(ctx c } // Send provides a mock function for the type MockDefaultCodecRequirements -func (_mock *MockDefaultCodecRequirements) Send(ctx context.Context, message spi.Message) error { - ret := _mock.Called(ctx, message) +func (_mock *MockDefaultCodecRequirements) Send(ctx context.Context, interactionId string, message spi.Message) error { + ret := _mock.Called(ctx, interactionId, message) if len(ret) == 0 { panic("no return value specified for Send") } var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, spi.Message) error); ok { - r0 = returnFunc(ctx, message) + if returnFunc, ok := ret.Get(0).(func(context.Context, string, spi.Message) error); ok { + r0 = returnFunc(ctx, interactionId, message) } else { r0 = ret.Error(0) } @@ -454,24 +454,30 @@ type MockDefaultCodecRequirements_Send_Call struct { // Send is a helper method to define mock.On call // - ctx context.Context +// - interactionId string // - message spi.Message -func (_e *MockDefaultCodecRequirements_Expecter) Send(ctx interface{}, message interface{}) *MockDefaultCodecRequirements_Send_Call { - return &MockDefaultCodecRequirements_Send_Call{Call: _e.mock.On("Send", ctx, message)} +func (_e *MockDefaultCodecRequirements_Expecter) Send(ctx interface{}, interactionId interface{}, message interface{}) *MockDefaultCodecRequirements_Send_Call { + return &MockDefaultCodecRequirements_Send_Call{Call: _e.mock.On("Send", ctx, interactionId, message)} } -func (_c *MockDefaultCodecRequirements_Send_Call) Run(run func(ctx context.Context, message spi.Message)) *MockDefaultCodecRequirements_Send_Call { +func (_c *MockDefaultCodecRequirements_Send_Call) Run(run func(ctx context.Context, interactionId string, message spi.Message)) *MockDefaultCodecRequirements_Send_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { arg0 = args[0].(context.Context) } - var arg1 spi.Message + var arg1 string if args[1] != nil { - arg1 = args[1].(spi.Message) + arg1 = args[1].(string) + } + var arg2 spi.Message + if args[2] != nil { + arg2 = args[2].(spi.Message) } run( arg0, arg1, + arg2, ) }) return _c @@ -482,7 +488,7 @@ func (_c *MockDefaultCodecRequirements_Send_Call) Return(err error) *MockDefault return _c } -func (_c *MockDefaultCodecRequirements_Send_Call) RunAndReturn(run func(ctx context.Context, message spi.Message) error) *MockDefaultCodecRequirements_Send_Call { +func (_c *MockDefaultCodecRequirements_Send_Call) RunAndReturn(run func(ctx context.Context, interactionId string, message spi.Message) error) *MockDefaultCodecRequirements_Send_Call { _c.Call.Return(run) return _c } @@ -610,8 +616,8 @@ func (_c *MockDefaultCodec_Disconnect_Call) RunAndReturn(run func() error) *Mock } // Expect provides a mock function for the type MockDefaultCodec -func (_mock *MockDefaultCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) { - _mock.Called(ctx, acceptsMessage, handleMessage, handleError) +func (_mock *MockDefaultCodec) Expect(ctx context.Context, interactionId string, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) { + _mock.Called(ctx, interactionId, acceptsMessage, handleMessage, handleError) return } @@ -622,36 +628,42 @@ type MockDefaultCodec_Expect_Call struct { // Expect is a helper method to define mock.On call // - ctx context.Context +// - interactionId string // - acceptsMessage spi.AcceptsMessage // - handleMessage spi.HandleMessage // - handleError spi.HandleError -func (_e *MockDefaultCodec_Expecter) Expect(ctx interface{}, acceptsMessage interface{}, handleMessage interface{}, handleError interface{}) *MockDefaultCodec_Expect_Call { - return &MockDefaultCodec_Expect_Call{Call: _e.mock.On("Expect", ctx, acceptsMessage, handleMessage, handleError)} +func (_e *MockDefaultCodec_Expecter) Expect(ctx interface{}, interactionId interface{}, acceptsMessage interface{}, handleMessage interface{}, handleError interface{}) *MockDefaultCodec_Expect_Call { + return &MockDefaultCodec_Expect_Call{Call: _e.mock.On("Expect", ctx, interactionId, acceptsMessage, handleMessage, handleError)} } -func (_c *MockDefaultCodec_Expect_Call) Run(run func(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError)) *MockDefaultCodec_Expect_Call { +func (_c *MockDefaultCodec_Expect_Call) Run(run func(ctx context.Context, interactionId string, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError)) *MockDefaultCodec_Expect_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { arg0 = args[0].(context.Context) } - var arg1 spi.AcceptsMessage + var arg1 string if args[1] != nil { - arg1 = args[1].(spi.AcceptsMessage) + arg1 = args[1].(string) } - var arg2 spi.HandleMessage + var arg2 spi.AcceptsMessage if args[2] != nil { - arg2 = args[2].(spi.HandleMessage) + arg2 = args[2].(spi.AcceptsMessage) } - var arg3 spi.HandleError + var arg3 spi.HandleMessage if args[3] != nil { - arg3 = args[3].(spi.HandleError) + arg3 = args[3].(spi.HandleMessage) + } + var arg4 spi.HandleError + if args[4] != nil { + arg4 = args[4].(spi.HandleError) } run( arg0, arg1, arg2, arg3, + arg4, ) }) return _c @@ -662,7 +674,7 @@ func (_c *MockDefaultCodec_Expect_Call) Return() *MockDefaultCodec_Expect_Call { return _c } -func (_c *MockDefaultCodec_Expect_Call) RunAndReturn(run func(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError)) *MockDefaultCodec_Expect_Call { +func (_c *MockDefaultCodec_Expect_Call) RunAndReturn(run func(ctx context.Context, interactionId string, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError)) *MockDefaultCodec_Expect_Call { _c.Run(run) return _c } @@ -804,16 +816,16 @@ func (_c *MockDefaultCodec_IsRunning_Call) RunAndReturn(run func() bool) *MockDe } // Send provides a mock function for the type MockDefaultCodec -func (_mock *MockDefaultCodec) Send(ctx context.Context, message spi.Message) error { - ret := _mock.Called(ctx, message) +func (_mock *MockDefaultCodec) Send(ctx context.Context, interactionId string, message spi.Message) error { + ret := _mock.Called(ctx, interactionId, message) if len(ret) == 0 { panic("no return value specified for Send") } var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, spi.Message) error); ok { - r0 = returnFunc(ctx, message) + if returnFunc, ok := ret.Get(0).(func(context.Context, string, spi.Message) error); ok { + r0 = returnFunc(ctx, interactionId, message) } else { r0 = ret.Error(0) } @@ -827,24 +839,30 @@ type MockDefaultCodec_Send_Call struct { // Send is a helper method to define mock.On call // - ctx context.Context +// - interactionId string // - message spi.Message -func (_e *MockDefaultCodec_Expecter) Send(ctx interface{}, message interface{}) *MockDefaultCodec_Send_Call { - return &MockDefaultCodec_Send_Call{Call: _e.mock.On("Send", ctx, message)} +func (_e *MockDefaultCodec_Expecter) Send(ctx interface{}, interactionId interface{}, message interface{}) *MockDefaultCodec_Send_Call { + return &MockDefaultCodec_Send_Call{Call: _e.mock.On("Send", ctx, interactionId, message)} } -func (_c *MockDefaultCodec_Send_Call) Run(run func(ctx context.Context, message spi.Message)) *MockDefaultCodec_Send_Call { +func (_c *MockDefaultCodec_Send_Call) Run(run func(ctx context.Context, interactionId string, message spi.Message)) *MockDefaultCodec_Send_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { arg0 = args[0].(context.Context) } - var arg1 spi.Message + var arg1 string if args[1] != nil { - arg1 = args[1].(spi.Message) + arg1 = args[1].(string) + } + var arg2 spi.Message + if args[2] != nil { + arg2 = args[2].(spi.Message) } run( arg0, arg1, + arg2, ) }) return _c @@ -855,22 +873,22 @@ func (_c *MockDefaultCodec_Send_Call) Return(err error) *MockDefaultCodec_Send_C return _c } -func (_c *MockDefaultCodec_Send_Call) RunAndReturn(run func(ctx context.Context, message spi.Message) error) *MockDefaultCodec_Send_Call { +func (_c *MockDefaultCodec_Send_Call) RunAndReturn(run func(ctx context.Context, interactionId string, message spi.Message) error) *MockDefaultCodec_Send_Call { _c.Call.Return(run) return _c } // SendRequest provides a mock function for the type MockDefaultCodec -func (_mock *MockDefaultCodec) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) error { - ret := _mock.Called(ctx, message, acceptsMessage, handleMessage, handleError) +func (_mock *MockDefaultCodec) SendRequest(ctx context.Context, interactionId string, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) error { + ret := _mock.Called(ctx, interactionId, message, acceptsMessage, handleMessage, handleError) if len(ret) == 0 { panic("no return value specified for SendRequest") } var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, spi.Message, spi.AcceptsMessage, spi.HandleMessage, spi.HandleError) error); ok { - r0 = returnFunc(ctx, message, acceptsMessage, handleMessage, handleError) + if returnFunc, ok := ret.Get(0).(func(context.Context, string, spi.Message, spi.AcceptsMessage, spi.HandleMessage, spi.HandleError) error); ok { + r0 = returnFunc(ctx, interactionId, message, acceptsMessage, handleMessage, handleError) } else { r0 = ret.Error(0) } @@ -884,35 +902,40 @@ type MockDefaultCodec_SendRequest_Call struct { // SendRequest is a helper method to define mock.On call // - ctx context.Context +// - interactionId string // - message spi.Message // - acceptsMessage spi.AcceptsMessage // - handleMessage spi.HandleMessage // - handleError spi.HandleError -func (_e *MockDefaultCodec_Expecter) SendRequest(ctx interface{}, message interface{}, acceptsMessage interface{}, handleMessage interface{}, handleError interface{}) *MockDefaultCodec_SendRequest_Call { - return &MockDefaultCodec_SendRequest_Call{Call: _e.mock.On("SendRequest", ctx, message, acceptsMessage, handleMessage, handleError)} +func (_e *MockDefaultCodec_Expecter) SendRequest(ctx interface{}, interactionId interface{}, message interface{}, acceptsMessage interface{}, handleMessage interface{}, handleError interface{}) *MockDefaultCodec_SendRequest_Call { + return &MockDefaultCodec_SendRequest_Call{Call: _e.mock.On("SendRequest", ctx, interactionId, message, acceptsMessage, handleMessage, handleError)} } -func (_c *MockDefaultCodec_SendRequest_Call) Run(run func(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError)) *MockDefaultCodec_SendRequest_Call { +func (_c *MockDefaultCodec_SendRequest_Call) Run(run func(ctx context.Context, interactionId string, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError)) *MockDefaultCodec_SendRequest_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { arg0 = args[0].(context.Context) } - var arg1 spi.Message + var arg1 string if args[1] != nil { - arg1 = args[1].(spi.Message) + arg1 = args[1].(string) } - var arg2 spi.AcceptsMessage + var arg2 spi.Message if args[2] != nil { - arg2 = args[2].(spi.AcceptsMessage) + arg2 = args[2].(spi.Message) } - var arg3 spi.HandleMessage + var arg3 spi.AcceptsMessage if args[3] != nil { - arg3 = args[3].(spi.HandleMessage) + arg3 = args[3].(spi.AcceptsMessage) } - var arg4 spi.HandleError + var arg4 spi.HandleMessage if args[4] != nil { - arg4 = args[4].(spi.HandleError) + arg4 = args[4].(spi.HandleMessage) + } + var arg5 spi.HandleError + if args[5] != nil { + arg5 = args[5].(spi.HandleError) } run( arg0, @@ -920,6 +943,7 @@ func (_c *MockDefaultCodec_SendRequest_Call) Run(run func(ctx context.Context, m arg2, arg3, arg4, + arg5, ) }) return _c @@ -930,7 +954,7 @@ func (_c *MockDefaultCodec_SendRequest_Call) Return(err error) *MockDefaultCodec return _c } -func (_c *MockDefaultCodec_SendRequest_Call) RunAndReturn(run func(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) error) *MockDefaultCodec_SendRequest_Call { +func (_c *MockDefaultCodec_SendRequest_Call) RunAndReturn(run func(ctx context.Context, interactionId string, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) error) *MockDefaultCodec_SendRequest_Call { _c.Call.Return(run) return _c } @@ -3706,8 +3730,8 @@ func (_c *MockMessageCodec_Disconnect_Call) RunAndReturn(run func() error) *Mock } // Expect provides a mock function for the type MockMessageCodec -func (_mock *MockMessageCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) { - _mock.Called(ctx, acceptsMessage, handleMessage, handleError) +func (_mock *MockMessageCodec) Expect(ctx context.Context, interactionId string, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) { + _mock.Called(ctx, interactionId, acceptsMessage, handleMessage, handleError) return } @@ -3718,36 +3742,42 @@ type MockMessageCodec_Expect_Call struct { // Expect is a helper method to define mock.On call // - ctx context.Context +// - interactionId string // - acceptsMessage spi.AcceptsMessage // - handleMessage spi.HandleMessage // - handleError spi.HandleError -func (_e *MockMessageCodec_Expecter) Expect(ctx interface{}, acceptsMessage interface{}, handleMessage interface{}, handleError interface{}) *MockMessageCodec_Expect_Call { - return &MockMessageCodec_Expect_Call{Call: _e.mock.On("Expect", ctx, acceptsMessage, handleMessage, handleError)} +func (_e *MockMessageCodec_Expecter) Expect(ctx interface{}, interactionId interface{}, acceptsMessage interface{}, handleMessage interface{}, handleError interface{}) *MockMessageCodec_Expect_Call { + return &MockMessageCodec_Expect_Call{Call: _e.mock.On("Expect", ctx, interactionId, acceptsMessage, handleMessage, handleError)} } -func (_c *MockMessageCodec_Expect_Call) Run(run func(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError)) *MockMessageCodec_Expect_Call { +func (_c *MockMessageCodec_Expect_Call) Run(run func(ctx context.Context, interactionId string, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError)) *MockMessageCodec_Expect_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { arg0 = args[0].(context.Context) } - var arg1 spi.AcceptsMessage + var arg1 string if args[1] != nil { - arg1 = args[1].(spi.AcceptsMessage) + arg1 = args[1].(string) } - var arg2 spi.HandleMessage + var arg2 spi.AcceptsMessage if args[2] != nil { - arg2 = args[2].(spi.HandleMessage) + arg2 = args[2].(spi.AcceptsMessage) } - var arg3 spi.HandleError + var arg3 spi.HandleMessage if args[3] != nil { - arg3 = args[3].(spi.HandleError) + arg3 = args[3].(spi.HandleMessage) + } + var arg4 spi.HandleError + if args[4] != nil { + arg4 = args[4].(spi.HandleError) } run( arg0, arg1, arg2, arg3, + arg4, ) }) return _c @@ -3758,7 +3788,7 @@ func (_c *MockMessageCodec_Expect_Call) Return() *MockMessageCodec_Expect_Call { return _c } -func (_c *MockMessageCodec_Expect_Call) RunAndReturn(run func(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError)) *MockMessageCodec_Expect_Call { +func (_c *MockMessageCodec_Expect_Call) RunAndReturn(run func(ctx context.Context, interactionId string, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError)) *MockMessageCodec_Expect_Call { _c.Run(run) return _c } @@ -3900,16 +3930,16 @@ func (_c *MockMessageCodec_IsRunning_Call) RunAndReturn(run func() bool) *MockMe } // Send provides a mock function for the type MockMessageCodec -func (_mock *MockMessageCodec) Send(ctx context.Context, message spi.Message) error { - ret := _mock.Called(ctx, message) +func (_mock *MockMessageCodec) Send(ctx context.Context, interactionId string, message spi.Message) error { + ret := _mock.Called(ctx, interactionId, message) if len(ret) == 0 { panic("no return value specified for Send") } var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, spi.Message) error); ok { - r0 = returnFunc(ctx, message) + if returnFunc, ok := ret.Get(0).(func(context.Context, string, spi.Message) error); ok { + r0 = returnFunc(ctx, interactionId, message) } else { r0 = ret.Error(0) } @@ -3923,24 +3953,30 @@ type MockMessageCodec_Send_Call struct { // Send is a helper method to define mock.On call // - ctx context.Context +// - interactionId string // - message spi.Message -func (_e *MockMessageCodec_Expecter) Send(ctx interface{}, message interface{}) *MockMessageCodec_Send_Call { - return &MockMessageCodec_Send_Call{Call: _e.mock.On("Send", ctx, message)} +func (_e *MockMessageCodec_Expecter) Send(ctx interface{}, interactionId interface{}, message interface{}) *MockMessageCodec_Send_Call { + return &MockMessageCodec_Send_Call{Call: _e.mock.On("Send", ctx, interactionId, message)} } -func (_c *MockMessageCodec_Send_Call) Run(run func(ctx context.Context, message spi.Message)) *MockMessageCodec_Send_Call { +func (_c *MockMessageCodec_Send_Call) Run(run func(ctx context.Context, interactionId string, message spi.Message)) *MockMessageCodec_Send_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { arg0 = args[0].(context.Context) } - var arg1 spi.Message + var arg1 string if args[1] != nil { - arg1 = args[1].(spi.Message) + arg1 = args[1].(string) + } + var arg2 spi.Message + if args[2] != nil { + arg2 = args[2].(spi.Message) } run( arg0, arg1, + arg2, ) }) return _c @@ -3951,22 +3987,22 @@ func (_c *MockMessageCodec_Send_Call) Return(err error) *MockMessageCodec_Send_C return _c } -func (_c *MockMessageCodec_Send_Call) RunAndReturn(run func(ctx context.Context, message spi.Message) error) *MockMessageCodec_Send_Call { +func (_c *MockMessageCodec_Send_Call) RunAndReturn(run func(ctx context.Context, interactionId string, message spi.Message) error) *MockMessageCodec_Send_Call { _c.Call.Return(run) return _c } // SendRequest provides a mock function for the type MockMessageCodec -func (_mock *MockMessageCodec) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) error { - ret := _mock.Called(ctx, message, acceptsMessage, handleMessage, handleError) +func (_mock *MockMessageCodec) SendRequest(ctx context.Context, interactionId string, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) error { + ret := _mock.Called(ctx, interactionId, message, acceptsMessage, handleMessage, handleError) if len(ret) == 0 { panic("no return value specified for SendRequest") } var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, spi.Message, spi.AcceptsMessage, spi.HandleMessage, spi.HandleError) error); ok { - r0 = returnFunc(ctx, message, acceptsMessage, handleMessage, handleError) + if returnFunc, ok := ret.Get(0).(func(context.Context, string, spi.Message, spi.AcceptsMessage, spi.HandleMessage, spi.HandleError) error); ok { + r0 = returnFunc(ctx, interactionId, message, acceptsMessage, handleMessage, handleError) } else { r0 = ret.Error(0) } @@ -3980,35 +4016,40 @@ type MockMessageCodec_SendRequest_Call struct { // SendRequest is a helper method to define mock.On call // - ctx context.Context +// - interactionId string // - message spi.Message // - acceptsMessage spi.AcceptsMessage // - handleMessage spi.HandleMessage // - handleError spi.HandleError -func (_e *MockMessageCodec_Expecter) SendRequest(ctx interface{}, message interface{}, acceptsMessage interface{}, handleMessage interface{}, handleError interface{}) *MockMessageCodec_SendRequest_Call { - return &MockMessageCodec_SendRequest_Call{Call: _e.mock.On("SendRequest", ctx, message, acceptsMessage, handleMessage, handleError)} +func (_e *MockMessageCodec_Expecter) SendRequest(ctx interface{}, interactionId interface{}, message interface{}, acceptsMessage interface{}, handleMessage interface{}, handleError interface{}) *MockMessageCodec_SendRequest_Call { + return &MockMessageCodec_SendRequest_Call{Call: _e.mock.On("SendRequest", ctx, interactionId, message, acceptsMessage, handleMessage, handleError)} } -func (_c *MockMessageCodec_SendRequest_Call) Run(run func(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError)) *MockMessageCodec_SendRequest_Call { +func (_c *MockMessageCodec_SendRequest_Call) Run(run func(ctx context.Context, interactionId string, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError)) *MockMessageCodec_SendRequest_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { arg0 = args[0].(context.Context) } - var arg1 spi.Message + var arg1 string if args[1] != nil { - arg1 = args[1].(spi.Message) + arg1 = args[1].(string) } - var arg2 spi.AcceptsMessage + var arg2 spi.Message if args[2] != nil { - arg2 = args[2].(spi.AcceptsMessage) + arg2 = args[2].(spi.Message) } - var arg3 spi.HandleMessage + var arg3 spi.AcceptsMessage if args[3] != nil { - arg3 = args[3].(spi.HandleMessage) + arg3 = args[3].(spi.AcceptsMessage) } - var arg4 spi.HandleError + var arg4 spi.HandleMessage if args[4] != nil { - arg4 = args[4].(spi.HandleError) + arg4 = args[4].(spi.HandleMessage) + } + var arg5 spi.HandleError + if args[5] != nil { + arg5 = args[5].(spi.HandleError) } run( arg0, @@ -4016,6 +4057,7 @@ func (_c *MockMessageCodec_SendRequest_Call) Run(run func(ctx context.Context, m arg2, arg3, arg4, + arg5, ) }) return _c @@ -4026,7 +4068,7 @@ func (_c *MockMessageCodec_SendRequest_Call) Return(err error) *MockMessageCodec return _c } -func (_c *MockMessageCodec_SendRequest_Call) RunAndReturn(run func(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) error) *MockMessageCodec_SendRequest_Call { +func (_c *MockMessageCodec_SendRequest_Call) RunAndReturn(run func(ctx context.Context, interactionId string, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError) error) *MockMessageCodec_SendRequest_Call { _c.Call.Return(run) return _c } diff --git a/plc4go/spi/mocks_test.go b/plc4go/spi/mocks_test.go index 56348c89bd..9cb4800577 100644 --- a/plc4go/spi/mocks_test.go +++ b/plc4go/spi/mocks_test.go @@ -1489,8 +1489,8 @@ func (_c *MockMessageCodec_Disconnect_Call) RunAndReturn(run func() error) *Mock } // Expect provides a mock function for the type MockMessageCodec -func (_mock *MockMessageCodec) Expect(ctx context.Context, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError) { - _mock.Called(ctx, acceptsMessage, handleMessage, handleError) +func (_mock *MockMessageCodec) Expect(ctx context.Context, interactionId string, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError) { + _mock.Called(ctx, interactionId, acceptsMessage, handleMessage, handleError) return } @@ -1501,36 +1501,42 @@ type MockMessageCodec_Expect_Call struct { // Expect is a helper method to define mock.On call // - ctx context.Context +// - interactionId string // - acceptsMessage AcceptsMessage // - handleMessage HandleMessage // - handleError HandleError -func (_e *MockMessageCodec_Expecter) Expect(ctx interface{}, acceptsMessage interface{}, handleMessage interface{}, handleError interface{}) *MockMessageCodec_Expect_Call { - return &MockMessageCodec_Expect_Call{Call: _e.mock.On("Expect", ctx, acceptsMessage, handleMessage, handleError)} +func (_e *MockMessageCodec_Expecter) Expect(ctx interface{}, interactionId interface{}, acceptsMessage interface{}, handleMessage interface{}, handleError interface{}) *MockMessageCodec_Expect_Call { + return &MockMessageCodec_Expect_Call{Call: _e.mock.On("Expect", ctx, interactionId, acceptsMessage, handleMessage, handleError)} } -func (_c *MockMessageCodec_Expect_Call) Run(run func(ctx context.Context, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError)) *MockMessageCodec_Expect_Call { +func (_c *MockMessageCodec_Expect_Call) Run(run func(ctx context.Context, interactionId string, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError)) *MockMessageCodec_Expect_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { arg0 = args[0].(context.Context) } - var arg1 AcceptsMessage + var arg1 string if args[1] != nil { - arg1 = args[1].(AcceptsMessage) + arg1 = args[1].(string) } - var arg2 HandleMessage + var arg2 AcceptsMessage if args[2] != nil { - arg2 = args[2].(HandleMessage) + arg2 = args[2].(AcceptsMessage) } - var arg3 HandleError + var arg3 HandleMessage if args[3] != nil { - arg3 = args[3].(HandleError) + arg3 = args[3].(HandleMessage) + } + var arg4 HandleError + if args[4] != nil { + arg4 = args[4].(HandleError) } run( arg0, arg1, arg2, arg3, + arg4, ) }) return _c @@ -1541,7 +1547,7 @@ func (_c *MockMessageCodec_Expect_Call) Return() *MockMessageCodec_Expect_Call { return _c } -func (_c *MockMessageCodec_Expect_Call) RunAndReturn(run func(ctx context.Context, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError)) *MockMessageCodec_Expect_Call { +func (_c *MockMessageCodec_Expect_Call) RunAndReturn(run func(ctx context.Context, interactionId string, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError)) *MockMessageCodec_Expect_Call { _c.Run(run) return _c } @@ -1637,16 +1643,16 @@ func (_c *MockMessageCodec_IsRunning_Call) RunAndReturn(run func() bool) *MockMe } // Send provides a mock function for the type MockMessageCodec -func (_mock *MockMessageCodec) Send(ctx context.Context, message Message) error { - ret := _mock.Called(ctx, message) +func (_mock *MockMessageCodec) Send(ctx context.Context, interactionId string, message Message) error { + ret := _mock.Called(ctx, interactionId, message) if len(ret) == 0 { panic("no return value specified for Send") } var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, Message) error); ok { - r0 = returnFunc(ctx, message) + if returnFunc, ok := ret.Get(0).(func(context.Context, string, Message) error); ok { + r0 = returnFunc(ctx, interactionId, message) } else { r0 = ret.Error(0) } @@ -1660,24 +1666,30 @@ type MockMessageCodec_Send_Call struct { // Send is a helper method to define mock.On call // - ctx context.Context +// - interactionId string // - message Message -func (_e *MockMessageCodec_Expecter) Send(ctx interface{}, message interface{}) *MockMessageCodec_Send_Call { - return &MockMessageCodec_Send_Call{Call: _e.mock.On("Send", ctx, message)} +func (_e *MockMessageCodec_Expecter) Send(ctx interface{}, interactionId interface{}, message interface{}) *MockMessageCodec_Send_Call { + return &MockMessageCodec_Send_Call{Call: _e.mock.On("Send", ctx, interactionId, message)} } -func (_c *MockMessageCodec_Send_Call) Run(run func(ctx context.Context, message Message)) *MockMessageCodec_Send_Call { +func (_c *MockMessageCodec_Send_Call) Run(run func(ctx context.Context, interactionId string, message Message)) *MockMessageCodec_Send_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { arg0 = args[0].(context.Context) } - var arg1 Message + var arg1 string if args[1] != nil { - arg1 = args[1].(Message) + arg1 = args[1].(string) + } + var arg2 Message + if args[2] != nil { + arg2 = args[2].(Message) } run( arg0, arg1, + arg2, ) }) return _c @@ -1688,22 +1700,22 @@ func (_c *MockMessageCodec_Send_Call) Return(err error) *MockMessageCodec_Send_C return _c } -func (_c *MockMessageCodec_Send_Call) RunAndReturn(run func(ctx context.Context, message Message) error) *MockMessageCodec_Send_Call { +func (_c *MockMessageCodec_Send_Call) RunAndReturn(run func(ctx context.Context, interactionId string, message Message) error) *MockMessageCodec_Send_Call { _c.Call.Return(run) return _c } // SendRequest provides a mock function for the type MockMessageCodec -func (_mock *MockMessageCodec) SendRequest(ctx context.Context, message Message, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError) error { - ret := _mock.Called(ctx, message, acceptsMessage, handleMessage, handleError) +func (_mock *MockMessageCodec) SendRequest(ctx context.Context, interactionId string, message Message, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError) error { + ret := _mock.Called(ctx, interactionId, message, acceptsMessage, handleMessage, handleError) if len(ret) == 0 { panic("no return value specified for SendRequest") } var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, Message, AcceptsMessage, HandleMessage, HandleError) error); ok { - r0 = returnFunc(ctx, message, acceptsMessage, handleMessage, handleError) + if returnFunc, ok := ret.Get(0).(func(context.Context, string, Message, AcceptsMessage, HandleMessage, HandleError) error); ok { + r0 = returnFunc(ctx, interactionId, message, acceptsMessage, handleMessage, handleError) } else { r0 = ret.Error(0) } @@ -1717,35 +1729,40 @@ type MockMessageCodec_SendRequest_Call struct { // SendRequest is a helper method to define mock.On call // - ctx context.Context +// - interactionId string // - message Message // - acceptsMessage AcceptsMessage // - handleMessage HandleMessage // - handleError HandleError -func (_e *MockMessageCodec_Expecter) SendRequest(ctx interface{}, message interface{}, acceptsMessage interface{}, handleMessage interface{}, handleError interface{}) *MockMessageCodec_SendRequest_Call { - return &MockMessageCodec_SendRequest_Call{Call: _e.mock.On("SendRequest", ctx, message, acceptsMessage, handleMessage, handleError)} +func (_e *MockMessageCodec_Expecter) SendRequest(ctx interface{}, interactionId interface{}, message interface{}, acceptsMessage interface{}, handleMessage interface{}, handleError interface{}) *MockMessageCodec_SendRequest_Call { + return &MockMessageCodec_SendRequest_Call{Call: _e.mock.On("SendRequest", ctx, interactionId, message, acceptsMessage, handleMessage, handleError)} } -func (_c *MockMessageCodec_SendRequest_Call) Run(run func(ctx context.Context, message Message, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError)) *MockMessageCodec_SendRequest_Call { +func (_c *MockMessageCodec_SendRequest_Call) Run(run func(ctx context.Context, interactionId string, message Message, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError)) *MockMessageCodec_SendRequest_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { arg0 = args[0].(context.Context) } - var arg1 Message + var arg1 string if args[1] != nil { - arg1 = args[1].(Message) + arg1 = args[1].(string) } - var arg2 AcceptsMessage + var arg2 Message if args[2] != nil { - arg2 = args[2].(AcceptsMessage) + arg2 = args[2].(Message) } - var arg3 HandleMessage + var arg3 AcceptsMessage if args[3] != nil { - arg3 = args[3].(HandleMessage) + arg3 = args[3].(AcceptsMessage) } - var arg4 HandleError + var arg4 HandleMessage if args[4] != nil { - arg4 = args[4].(HandleError) + arg4 = args[4].(HandleMessage) + } + var arg5 HandleError + if args[5] != nil { + arg5 = args[5].(HandleError) } run( arg0, @@ -1753,6 +1770,7 @@ func (_c *MockMessageCodec_SendRequest_Call) Run(run func(ctx context.Context, m arg2, arg3, arg4, + arg5, ) }) return _c @@ -1763,7 +1781,7 @@ func (_c *MockMessageCodec_SendRequest_Call) Return(err error) *MockMessageCodec return _c } -func (_c *MockMessageCodec_SendRequest_Call) RunAndReturn(run func(ctx context.Context, message Message, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError) error) *MockMessageCodec_SendRequest_Call { +func (_c *MockMessageCodec_SendRequest_Call) RunAndReturn(run func(ctx context.Context, interactionId string, message Message, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError) error) *MockMessageCodec_SendRequest_Call { _c.Call.Return(run) return _c } diff --git a/plc4go/spi/transports/test/TransportInstance.go b/plc4go/spi/transports/test/TransportInstance.go index 2ed45e9af9..ffc21b3997 100644 --- a/plc4go/spi/transports/test/TransportInstance.go +++ b/plc4go/spi/transports/test/TransportInstance.go @@ -355,7 +355,7 @@ func (m *TransportInstance) transferFromChannel(ctx context.Context) (totalAvail case <-ctx.Done(): m.log.Trace().Msg("Context done") case newBytes := <-m.readChannel: - m.log.Trace().Dur("time", time.Since(start)).Msg("Got new bytes") + m.log.Trace().Dur("time", time.Since(start)).Int("nBytes", len(newBytes)).Msg("Got new bytes") totalAvailableBytes = m.appendRead(newBytes...) } return totalAvailableBytes
