This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch self-decode in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
commit ba52083758ceeaae8a798cd2cf6dae453ad96e7f Author: mrproliu <[email protected]> AuthorDate: Thu Dec 5 16:27:10 2024 +0900 Decode the BPF data by self instant `binary.Read` to reduce CPU usage --- CHANGES.md | 1 + pkg/accesslog/events/close.go | 11 ++ pkg/accesslog/events/connect.go | 24 ++++ pkg/accesslog/events/data.go | 17 +++ pkg/accesslog/events/detail.go | 31 +++++ pkg/accesslog/events/events_test.go | 241 ++++++++++++++++++++++++++++++++++++ pkg/accesslog/events/ztunnel.go | 13 ++ pkg/tools/btf/linker.go | 20 ++- pkg/tools/btf/reader/reader.go | 105 ++++++++++++++++ 9 files changed, 458 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 1e5ea60..eb7e90c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,7 @@ Release Notes. * Add a delay time before delete the connection in the access log module. * Fix context structs parameters for tracepoint programs. * Improve the build of skywalking-rover by adding some options. +* Decode the BPF data by self instant `binary.Read` to reduce CPU usage. #### Bug Fixes * Fix the base image cannot run in the arm64. diff --git a/pkg/accesslog/events/close.go b/pkg/accesslog/events/close.go index de24669..acd0d61 100644 --- a/pkg/accesslog/events/close.go +++ b/pkg/accesslog/events/close.go @@ -20,6 +20,7 @@ package events import ( "time" + "github.com/apache/skywalking-rover/pkg/tools/btf/reader" "github.com/apache/skywalking-rover/pkg/tools/host" ) @@ -34,6 +35,16 @@ type SocketCloseEvent struct { Success uint32 } +func (c *SocketCloseEvent) ReadFrom(r *reader.Reader) { + c.ConnectionID = r.ReadUint64() + c.RandomID = r.ReadUint64() + c.StartTime = r.ReadUint64() + c.EndTime = r.ReadUint64() + c.PID = r.ReadUint32() + c.SocketFD = r.ReadUint32() + c.Success = r.ReadUint32() +} + func (c *SocketCloseEvent) GetConnectionID() uint64 { return c.ConnectionID } diff --git a/pkg/accesslog/events/connect.go b/pkg/accesslog/events/connect.go index dec4143..9433168 100644 --- a/pkg/accesslog/events/connect.go +++ b/pkg/accesslog/events/connect.go @@ -20,6 +20,7 @@ package events import ( "time" + "github.com/apache/skywalking-rover/pkg/tools/btf/reader" "github.com/apache/skywalking-rover/pkg/tools/host" ) @@ -46,6 +47,29 @@ type SocketConnectEvent struct { ConnTrackUpstreamPort uint32 } +func (c *SocketConnectEvent) ReadFrom(r *reader.Reader) { + c.ConID = r.ReadUint64() + c.RandomID = r.ReadUint64() + c.StartTime = r.ReadUint64() + c.EndTime = r.ReadUint64() + c.PID = r.ReadUint32() + c.SocketFD = r.ReadUint32() + c.FuncName = r.ReadUint8() + c.Role = r.ReadUint8() + c.SocketFamily = r.ReadUint8() + c.ConnectSuccess = r.ReadUint8() + c.Pad0 = r.ReadUint32() + c.RemoteAddrV4 = r.ReadUint32() + c.RemoteAddrPort = r.ReadUint32() + r.ReadUint8Array(c.RemoteAddrV6[:], 16) + c.LocalAddrV4 = r.ReadUint32() + c.LocalAddrPort = r.ReadUint32() + r.ReadUint8Array(c.LocalAddrV6[:], 16) + c.ConnTrackUpstreamIPl = r.ReadUint64() + c.ConnTrackUpstreamIPh = r.ReadUint64() + c.ConnTrackUpstreamPort = r.ReadUint32() +} + func (c *SocketConnectEvent) GetConnectionID() uint64 { return c.ConID } diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go index ed48059..83e4ba4 100644 --- a/pkg/accesslog/events/data.go +++ b/pkg/accesslog/events/data.go @@ -20,6 +20,7 @@ package events import ( "fmt" + "github.com/apache/skywalking-rover/pkg/tools/btf/reader" "github.com/apache/skywalking-rover/pkg/tools/enums" ) @@ -39,6 +40,22 @@ type SocketDataUploadEvent struct { Buffer [2048]byte } +func (s *SocketDataUploadEvent) ReadFrom(r *reader.Reader) { + s.Protocol0 = enums.ConnectionProtocol(r.ReadUint8()) + s.HaveReduce = r.ReadUint8() + s.Direction0 = enums.SocketDataDirection(r.ReadUint8()) + s.Finished = r.ReadUint8() + s.Sequence0 = r.ReadUint16() + s.DataLen = r.ReadUint16() + s.StartTime0 = r.ReadUint64() + s.EndTime0 = r.ReadUint64() + s.ConnectionID = r.ReadUint64() + s.RandomID = r.ReadUint64() + s.DataID0 = r.ReadUint64() + s.TotalSize0 = r.ReadUint64() + r.ReadUint8Array(s.Buffer[:], 2048) +} + func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol { return s.Protocol0 } diff --git a/pkg/accesslog/events/detail.go b/pkg/accesslog/events/detail.go index 0f56edf..3917931 100644 --- a/pkg/accesslog/events/detail.go +++ b/pkg/accesslog/events/detail.go @@ -20,6 +20,7 @@ package events import ( "time" + "github.com/apache/skywalking-rover/pkg/tools/btf/reader" "github.com/apache/skywalking-rover/pkg/tools/buffer" "github.com/apache/skywalking-rover/pkg/tools/enums" "github.com/apache/skywalking-rover/pkg/tools/host" @@ -72,6 +73,36 @@ type SocketDetailEvent struct { SSL uint8 } +func (d *SocketDetailEvent) ReadFrom(r *reader.Reader) { + d.ConnectionID = r.ReadUint64() + d.RandomID = r.ReadUint64() + d.DataID0 = r.ReadUint64() + d.StartTime = r.ReadUint64() + d.EndTime = r.ReadUint64() + d.L4Duration = r.ReadUint32() + d.L3Duration = r.ReadUint32() + d.L3LocalDuration = r.ReadUint32() + d.L3OutputDuration = r.ReadUint32() + d.L3ResolveMacDuration = r.ReadUint32() + d.L3NetFilterDuration = r.ReadUint32() + d.L2Duration = r.ReadUint32() + d.L2ReadySendDuration = r.ReadUint32() + d.L2SendDuration = r.ReadUint32() + d.L2PackageToQueueDuration = r.ReadUint32() + d.L3TotalRcvDuration = r.ReadUint32() + d.IfIndex = r.ReadUint32() + d.L4PackageRcvFromQueueDuration = r.ReadUint64() + d.L4TotalPackageSize = r.ReadUint64() + d.L2EnterQueueCount = r.ReadUint8() + d.L4PackageCount = r.ReadUint8() + d.L4RetransmitPackageCount = r.ReadUint8() + d.L3ResolveMacCount = r.ReadUint8() + d.L3NetFilterCount = r.ReadUint8() + d.FunctionName = enums.SocketFunctionName(r.ReadUint8()) + d.Protocol = enums.ConnectionProtocol(r.ReadUint8()) + d.SSL = r.ReadUint8() +} + func (d *SocketDetailEvent) GetConnectionID() uint64 { return d.ConnectionID } diff --git a/pkg/accesslog/events/events_test.go b/pkg/accesslog/events/events_test.go new file mode 100644 index 0000000..7cd375a --- /dev/null +++ b/pkg/accesslog/events/events_test.go @@ -0,0 +1,241 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package events + +import ( + "bytes" + "encoding/binary" + "encoding/hex" + "fmt" + "reflect" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/apache/skywalking-rover/pkg/tools/btf/reader" +) + +func TestBufferRead(t *testing.T) { + tests := []struct { + hex string + create func() reader.EventReader + }{ + { + hex: ` +04 00 00 00 7a a1 00 00 4e 56 83 76 00 00 00 00 +47 3b e1 7a 5a 30 02 00 a6 bd e1 7a 5a 30 02 00 +7a a1 00 00 04 00 00 00 02 02 0a 01 00 00 00 00 +00 00 00 00 a0 b4 00 00 00 00 00 00 00 00 00 00 +00 00 ff ff 7f 00 00 01 00 00 00 00 fb 20 00 00 +00 00 00 00 00 00 00 00 00 00 ff ff 7f 00 00 01 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00`, + create: func() reader.EventReader { + return &SocketConnectEvent{} + }, + }, + { + hex: ` +04 00 00 00 7a a1 00 00 4e 56 83 76 00 00 00 00 +01 00 00 00 00 00 00 00 b2 2d 26 7c 5a 30 02 00 +5e 34 26 7c 5a 30 02 00 82 51 08 00 bd 64 07 00 +27 15 00 00 1d 02 00 00 37 3c 00 00 00 00 00 00 +23 4a 01 00 bb 49 01 00 00 00 00 00 e4 01 00 00 +24 21 00 00 01 00 00 00 39 d6 00 00 00 00 00 00 +03 00 00 00 00 00 00 00 02 02 00 02 02 09 01 00`, + create: func() reader.EventReader { + return &SocketDetailEvent{} + }, + }, + { + hex: ` +04 00 00 00 7a a1 00 00 4e 56 83 76 00 00 00 00 +b2 2d 26 7c 5a 30 02 00 5e 34 26 7c 5a 30 02 00 +7a a1 00 00 04 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00`, + create: func() reader.EventReader { + return &SocketCloseEvent{} + }, + }, + { + hex: ` +03 00 02 01 00 00 2a 06 9c 5c fc 7b 5a 30 02 00 +20 c5 fd 7b 5a 30 02 00 04 00 00 00 7a a1 00 00 +4e 56 83 76 00 00 00 00 02 00 00 00 00 00 00 00 +2a 06 00 00 00 00 00 00 16 03 03 00 7a 02 00 00 +76 03 03 b1 01 85 b9 ce d4 95 9d 1e 90 f5 90 c3 +f5 99 70 64 ab 0f 48 a0 fa 0d 8d 69 a3 e6 6d a5 +29 98 d7 20 6f a3 13 ea 3c e1 d7 20 28 1f df 94 +90 27 e4 35 33 55 de 90 64 1c 3e 04 6f 8a ce c3 +71 45 c0 5e 13 01 00 00 2e 00 2b 00 02 03 04 00 +33 00 24 00 1d 00 20 00 f1 14 c3 d8 1c 40 01 8e +9a d7 06 93 be 1d c0 a8 dd 42 53 96 19 51 ce aa +a6 80 4c d1 cc f1 15 14 03 03 00 01 01 17 03 03 +00 26 96 c9 a2 ec 5b d9 1b 81 38 a9 a1 b3 4e e3 +a7 ee c2 e3 00 21 44 6a cb 05 d1 cc 3f 59 c7 67 +29 f9 eb a9 b3 88 8b 43 17 03 03 03 8d cc f4 f1 +0c 23 44 96 ea 11 a3 f0 1d 2f 71 e2 29 ec 1f c2 +88 8a 04 a8 59 13 fe fd 45 24 aa 0b 64 b0 67 08 +41 17 39 f6 c7 ad 5d 0a 94 70 d9 89 74 6a 24 5a +91 7a 8d 92 a7 66 1c f9 10 17 52 0e df 9e 4c 24 +b4 23 1c bb 0c 78 a6 bb 8a 97 46 27 45 ae 3e 01 +83 12 ac 8a 75 12 68 f3 91 37 7e ae a8 41 61 82 +e6 48 a8 65 08 1b ae f5 28 92 b9 3c f8 47 93 77 +a9 f1 f9 a6 ec 67 6f 3d f9 00 df c7 da 43 27 c1 +8a 61 ac 28 6a aa 0d ce 99 25 c3 9e ab ee a7 ff +d8 0a e8 65 bd a0 4d a9 e8 0f 39 e6 b3 2f 95 ef +83 dd 31 31 d8 49 df 1a 5d 5c 51 76 7f bd 4b a3 +5d 08 da 25 c8 06 38 ba b4 d2 21 24 33 e1 b2 48 +8b af 2d bd 7b 32 9b 7c e6 b2 72 ba f2 fe 60 62 +db d9 b0 0a aa 34 3d f7 46 3a b2 d0 0a 84 fc 02 +a7 a9 d8 ca db 89 3b f4 a1 f1 de e5 5d 29 73 02 +2b 1b 8d 8d b7 06 1f a2 8e dd d4 6c 1d b8 1a 57 +8b 09 17 96 6f 00 62 75 c3 be 42 68 2a 29 1a 70 +f8 03 5b ae 69 cb 89 8a c1 00 34 d0 90 cf 69 5b +00 62 26 4a 74 d3 6d 0e 84 38 1a 29 da 7e d9 57 +44 22 75 f1 23 e8 8e e4 cb 80 ec 06 f7 c4 63 cb +0b ec a5 02 32 50 d9 92 40 f5 89 a7 18 10 79 c1 +fc d3 52 aa 15 8d 28 14 53 32 5c 46 db 4f 00 19 +5e 50 8c 17 e8 0e 36 71 1a 94 53 3c 03 42 0a 05 +8c 7d 7f 4e d3 a1 0b 90 aa a3 f7 9a a5 f9 a2 7f +36 4d 46 95 df 89 91 ef 01 ec 44 2c d1 79 b7 e8 +3f 1e 56 8e bb b6 fc c1 19 81 78 85 87 88 c4 f1 +64 69 df aa 33 f0 a9 1f aa 54 82 16 1f 4b 99 2b +18 38 9c bf 26 98 a7 12 f0 a2 04 de ef 98 63 da +49 ab f7 38 6d 0b 89 45 ee db c0 1e af bf d3 3a +27 6c 91 7a 9b d0 35 45 e7 65 c5 43 3d 70 68 03 +02 8d 68 c7 3f fe 1d 2b a4 0e 74 28 e9 82 21 9a +cb b1 b4 9e 91 01 53 89 51 d2 3d 37 b3 16 1c 3e +d8 5f 84 04 95 3a fc f5 9a b7 00 4c ba 10 72 31 +2f 6d 17 bd b8 9f 48 e5 3e 14 60 61 4c 33 86 a1 +bd 99 34 15 aa 61 39 89 97 91 3f dc 11 f7 25 d0 +5d 80 5e c5 dc 2c 03 d7 ab 2d 90 93 3c e5 f5 3e +2c 16 76 48 0b 94 b5 00 5e 8f 97 cf 10 2d 46 d3 +50 18 c2 8f 58 ac bd cf 6e 4e 2f 6d cc 71 4f 00 +1d 33 4f 3c 57 06 d6 48 8e 50 a9 e3 19 1d b6 13 +1c 6a 1d 43 88 4d 57 5d e2 be 79 6b 2b 86 0b 52 +01 31 67 1a 59 a0 7f 0c be c4 cb 5e 7e 5e bb 39 +45 2c 68 10 7c 51 39 a0 ed 83 1e 35 1b c4 63 8b +b5 e2 7b 8a 9d a7 ac 02 a3 fc cc ec c2 db c0 59 +7b db 4b 27 b8 52 38 12 4d 05 38 bd 2b bd 73 c8 +a1 33 c4 da 69 6a ce 32 f4 62 51 c6 87 c2 d8 f8 +45 5b df 9c 18 5a 91 2e c7 f9 44 87 69 0d 44 70 +04 23 f7 da b7 1e 8a 81 c5 28 15 bc b4 83 fb c2 +ef b8 95 b7 37 aa 2d 85 22 8c b8 26 28 7d c7 83 +d5 fe 30 bf 9c a9 44 d1 d4 37 34 5b a4 ff 63 fc +e6 31 d0 11 6c 4a bd 1d 7a 70 80 25 54 70 d1 44 +45 74 ed b6 50 a5 4e 59 f8 c2 f5 99 3d f9 26 43 +cd 21 7e 72 60 ef 53 03 f3 6e e7 8e 86 68 5f f0 +cc b3 09 64 56 f6 f5 37 53 06 fe ec 3c e7 79 a5 +82 7f e0 d0 5f e3 77 0b 18 4a 03 1e 63 a1 53 64 +df 87 57 40 f5 c2 56 bb 73 cb ce 68 d2 da 6c 0f +4e 57 06 8d 95 5f a9 6d f0 70 d8 bb 83 85 80 56 +52 a5 3f bc 4a 21 45 89 d4 0a 17 03 03 01 19 4f +b6 ea 71 c8 69 7d fa 10 21 08 8b 93 b2 d2 06 5b +2b b5 60 e7 cf 0d 85 ad 3d c4 53 e5 b6 7c d2 35 +e6 97 23 95 fb 61 15 57 3c 4a 67 ec 61 26 4d 58 +ee 08 af 47 f7 90 b3 11 ba 41 6a be 79 db cf 88 +1f d5 04 89 c9 b0 f0 bc 85 30 87 82 88 ee 77 8d +f9 ff 9d 77 f6 50 03 93 88 ea 62 14 cf 47 d4 ad +f7 c4 e1 be 46 7b c0 fa ab b1 76 39 50 76 55 e9 +8c c6 c8 a8 13 fa a3 2e 9c 4f 32 7f 9c a4 dc f3 +1d e8 fe 3c be af 6d 21 e4 e0 e4 53 b1 cb 3f 63 +ac d9 d2 17 81 fa 33 88 8d 61 82 40 5f 56 0f 91 +a0 d7 a6 33 fd 59 09 f4 95 99 f6 57 dd d5 32 44 +6f a0 64 2e 74 0a 54 90 65 c2 93 61 18 b4 b0 5e +15 27 fa 4d 53 e6 1d aa 1b 13 a6 00 d0 b6 98 07 +9a b5 91 03 2f 55 40 69 c0 69 4e 48 33 f1 03 15 +cc f8 d2 0a ad 74 6a 37 5a 1b a8 bb fa 3f 04 8c +a8 b5 23 a0 50 2b 8f a5 fb 1d e4 1b 2f 11 bf e1 +4c 5a 7b 72 4f f4 d5 65 23 e8 26 22 47 ad 8a e0 +eb 0e b3 ee db 54 7c 23 17 03 03 00 35 8e 6c 95 +11 a0 76 73 22 67 3a 72 b6 02 30 fe 55 94 60 bb +33 4a c4 fd 7f 6b 00 2c 10 37 4e 29 e8 f7 39 f9 +04 9d 92 97 93 12 ec d7 fe 9c fb 78 95 a2 c1 2d +74 d2 17 03 03 00 8b df 38 dc a2 d9 44 06 ce 79 +5a 6a e8 9f 97 83 e1 80 c2 84 3b 18 7f 16 d3 9e +fb 53 c9 03 b1 2b 66 fe 81 06 a8 89 4d a0 e3 64 +f7 39 53 b6 9c d4 4a 38 bc e4 db c8 d7 68 5e f1 +d7 6a 0c 49 4c 5c 28 f6 09 76 8e 15 0b 42 f6 1c +17 07 05 81 8a 05 23 50 cd b0 a6 a3 89 c9 ac 5d +35 35 33 15 4a 6f 31 80 a0 ea de 8e 56 e5 16 e5 +d7 f0 e3 f9 09 35 c2 be 9d 74 48 19 39 b8 c9 04 +70 9a 58 22 05 fc 68 78 52 b4 92 ab d2 14 66 97 +45 e6 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +00 00 00 00 00 00 00 00 00 00 00 00 +`, + create: func() reader.EventReader { + return &SocketDataUploadEvent{} + }, + }, + } + + for _, test := range tests { + t.Run(fmt.Sprintf("%v", reflect.TypeOf(test.create())), func(t *testing.T) { + test.hex = strings.ReplaceAll(test.hex, "\n", "") + test.hex = strings.ReplaceAll(test.hex, " ", "") + rawData, err := hex.DecodeString(test.hex) + if err != nil { + t.Fatalf("Failed to decode hex string: %v", err) + return + } + binaryRead := test.create() + selfRead := test.create() + bufReader := reader.NewReader(rawData) + selfRead.ReadFrom(bufReader) + if err := bufReader.HasError(); err != nil { + t.Fatalf("reading by self parsing error: %v", err) + } + if err := binary.Read(bytes.NewBuffer(rawData), binary.LittleEndian, binaryRead); err != nil { + t.Fatalf("reading buffer error: %v", err) + } + // self parsing should same with binary.Read + assert.Equal(t, selfRead, binaryRead) + }) + } + +} diff --git a/pkg/accesslog/events/ztunnel.go b/pkg/accesslog/events/ztunnel.go index ffdf793..abd2f1c 100644 --- a/pkg/accesslog/events/ztunnel.go +++ b/pkg/accesslog/events/ztunnel.go @@ -17,6 +17,8 @@ package events +import "github.com/apache/skywalking-rover/pkg/tools/btf/reader" + type ZTunnelSocketMappingEvent struct { OriginalSrcIP uint32 OriginalDestIP uint32 @@ -27,3 +29,14 @@ type ZTunnelSocketMappingEvent struct { Pad0 uint16 Pad1 uint32 } + +func (z *ZTunnelSocketMappingEvent) ReadFrom(r *reader.Reader) { + z.OriginalSrcIP = r.ReadUint32() + z.OriginalDestIP = r.ReadUint32() + z.OriginalSrcPort = r.ReadUint16() + z.OriginalDestPort = r.ReadUint16() + z.LoadBalancedDestIP = r.ReadUint32() + z.LoadBalancedDestPort = r.ReadUint16() + z.Pad0 = r.ReadUint16() + z.Pad1 = r.ReadUint32() +} diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go index c8b7c68..cbe13a0 100644 --- a/pkg/tools/btf/linker.go +++ b/pkg/tools/btf/linker.go @@ -30,6 +30,7 @@ import ( "golang.org/x/arch/arm64/arm64asm" "golang.org/x/arch/x86/x86asm" + "github.com/apache/skywalking-rover/pkg/tools/btf/reader" "github.com/apache/skywalking-rover/pkg/tools/elf" "github.com/apache/skywalking-rover/pkg/tools/process" @@ -158,7 +159,7 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, reader RingBufferReader, dataSup m.ReadEventAsyncWithBufferSize(emap, reader, os.Getpagesize(), dataSupplier) } -func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, reader RingBufferReader, perCPUBuffer int, dataSupplier func() interface{}) { +func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader RingBufferReader, perCPUBuffer int, dataSupplier func() interface{}) { rd, err := perf.NewReader(emap, perCPUBuffer) if err != nil { m.errors = multierror.Append(m.errors, fmt.Errorf("open ring buffer error: %v", err)) @@ -183,12 +184,21 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, reader RingBufferR } data := dataSupplier() - if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, data); err != nil { - log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err) - continue + if r, ok := data.(reader.EventReader); ok { + reader := reader.NewReader(record.RawSample) + r.ReadFrom(reader) + if readErr := reader.HasError(); readErr != nil { + log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err) + continue + } + } else { + if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, data); err != nil { + log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err) + continue + } } - reader(data) + bufReader(data) } }() } diff --git a/pkg/tools/btf/reader/reader.go b/pkg/tools/btf/reader/reader.go new file mode 100644 index 0000000..d2e980e --- /dev/null +++ b/pkg/tools/btf/reader/reader.go @@ -0,0 +1,105 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package reader + +import ( + "encoding/binary" + "fmt" +) + +// EventReader read the sample data by self, instant of binary.Read +type EventReader interface { + // ReadFrom read buffer data + ReadFrom(reader *Reader) +} + +// Reader buffer sample reader +type Reader struct { + Sample []byte + CurrentOffset int + sampleLen int + err error +} + +// NewReader create a reader from BPF buffer +func NewReader(sample []byte) *Reader { + return &Reader{ + Sample: sample, + CurrentOffset: 0, + sampleLen: len(sample), + } +} + +// HasError is there have error when reading buffer +func (r *Reader) HasError() error { + return r.err +} + +func (r *Reader) ReadUint64() uint64 { + bytes, err := r.read(8) + if err != nil { + return 0 + } + return binary.LittleEndian.Uint64(bytes) +} + +func (r *Reader) ReadUint32() uint32 { + bytes, err := r.read(4) + if err != nil { + return 0 + } + return binary.LittleEndian.Uint32(bytes) +} + +func (r *Reader) ReadUint16() uint16 { + bytes, err := r.read(2) + if err != nil { + return 0 + } + return binary.LittleEndian.Uint16(bytes) +} + +func (r *Reader) ReadUint8() uint8 { + bytes, err := r.read(1) + if err != nil { + return 0 + } + return bytes[0] +} + +func (r *Reader) ReadUint8Array(a []uint8, size int) { + read, err := r.read(size) + if err != nil { + return + } + copy(a, read) +} + +func (r *Reader) read(size int) ([]byte, error) { + if r.err != nil { + return nil, r.err + } + if r.sampleLen < r.CurrentOffset+size { + r.err = fmt.Errorf("out of the buffer reader bound, current offset: %d, read size: %d, sample size: %d", + r.CurrentOffset, size, r.sampleLen) + return nil, r.err + } + bytes := r.Sample[r.CurrentOffset : r.CurrentOffset+size] + r.CurrentOffset += size + return bytes, nil +}
