This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch ringbuf in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
commit a48d3f7b3c203e573127a30b59e3f518e940aadf Author: mrproliu <[email protected]> AuthorDate: Mon Dec 16 10:21:37 2024 +0800 Introduce ringbuf queue in access log module --- bpf/accesslog/syscalls/transfer.h | 17 +-- bpf/include/queue.h | 61 +++++++++++ bpf/include/socket_data.h | 73 ++++++------ pkg/accesslog/bpf/loader.go | 2 +- pkg/accesslog/collector/protocols/queue.go | 4 +- .../continuous/checker/bpf/network/network.go | 3 +- pkg/profiling/task/network/bpf/bpf.go | 2 +- pkg/profiling/task/offcpu/runner.go | 2 +- pkg/tools/btf/ebpf.go | 41 ++++++- pkg/tools/btf/linker.go | 19 ++-- pkg/tools/btf/queue.go | 122 +++++++++++++++++++++ 11 files changed, 278 insertions(+), 68 deletions(-) diff --git a/bpf/accesslog/syscalls/transfer.h b/bpf/accesslog/syscalls/transfer.h index e1f1ba7..3f8c7e5 100644 --- a/bpf/accesslog/syscalls/transfer.h +++ b/bpf/accesslog/syscalls/transfer.h @@ -19,6 +19,7 @@ #include "socket_opts.h" #include "socket_data.h" #include "socket_reader.h" +#include "queue.h" #include "protocol_analyzer.h" #include "../common/connection.h" #include "../common/data_args.h" @@ -68,15 +69,7 @@ struct socket_detail_t { __u8 ssl; }; -struct { - __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); - __type(key, __u32); - __type(value, struct socket_detail_t); - __uint(max_entries, 1); -} socket_detail_event_per_cpu_map SEC(".maps"); -struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} socket_detail_data_queue SEC(".maps"); +DATA_QUEUE(socket_detail, 1024 * 1024); static __always_inline void process_write_data(void *ctx, __u64 id, struct sock_data_args_t *args, ssize_t bytes_count, __u32 data_direction, const bool vecs, __u8 func_name, bool ssl) { @@ -144,8 +137,8 @@ static __always_inline void process_write_data(void *ctx, __u64 id, struct sock_ // 1. when the SSL connection sends SSL(unencrypted) message // 2. when the not SSL connection sends plain data if (conn->ssl == ssl) { - __u32 kZero = 0; - struct socket_detail_t *detail = bpf_map_lookup_elem(&socket_detail_event_per_cpu_map, &kZero); + struct socket_detail_t *detail; + detail = rover_reserve_buf(&socket_detail, sizeof(*detail)); if (detail != NULL) { detail->connection_id = conid; detail->random_id = conn->random_id; @@ -177,7 +170,7 @@ static __always_inline void process_write_data(void *ctx, __u64 id, struct sock_ detail->l2_enter_queue_count = args->l2_enter_queue_count; detail->l4_package_rcv_from_queue_time = args->total_package_receive_from_queue_time; - bpf_perf_event_output(ctx, &socket_detail_data_queue, BPF_F_CURRENT_CPU, detail, sizeof(*detail)); + rover_submit_buf(ctx, &socket_detail, detail, sizeof(*detail)); } } diff --git a/bpf/include/queue.h b/bpf/include/queue.h new file mode 100644 index 0000000..c6b532c --- /dev/null +++ b/bpf/include/queue.h @@ -0,0 +1,61 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "api.h" + +#define DATA_QUEUE(name, size) \ + struct { \ + __uint(type, BPF_MAP_TYPE_RINGBUF); \ + __uint(max_entries, size); \ + } name SEC(".maps"); \ + const void *rover_data_queue_##name __attribute__((unused)); + +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __uint(max_entries, 1); + __uint(key_size, sizeof(__u32)); + __uint(value_size, 10240); // all events are less than 10KB +} rover_data_heap SEC(".maps"); + +static __always_inline void *rover_reserve_buf(void *map, __u64 size) { + static const int zero = 0; + + if (bpf_core_enum_value_exists(enum bpf_func_id, + BPF_FUNC_ringbuf_reserve)) + return bpf_ringbuf_reserve(map, size, 0); + + return bpf_map_lookup_elem(&rover_data_heap, &zero); +} + +static __always_inline void rover_discard_buf(void *buf) +{ + if (bpf_core_enum_value_exists(enum bpf_func_id, + BPF_FUNC_ringbuf_discard)) + bpf_ringbuf_discard(buf, 0); +} + +static __always_inline long rover_submit_buf(void *ctx, void *map, void *buf, __u64 size) { + if (bpf_core_enum_value_exists(enum bpf_func_id, + BPF_FUNC_ringbuf_submit)) { + bpf_ringbuf_submit(buf, 0); + return 0; + } + + return bpf_perf_event_output(ctx, map, BPF_F_CURRENT_CPU, buf, size); +} \ No newline at end of file diff --git a/bpf/include/socket_data.h b/bpf/include/socket_data.h index cae2622..0d62ccb 100644 --- a/bpf/include/socket_data.h +++ b/bpf/include/socket_data.h @@ -19,6 +19,7 @@ #include "socket_opts.h" #include "protocol_analyzer.h" +#include "queue.h" #define SOCKET_UPLOAD_CHUNK_LIMIT 12 @@ -43,9 +44,7 @@ struct { __type(value, struct socket_data_upload_event); __uint(max_entries, 1); } socket_data_upload_event_per_cpu_map SEC(".maps"); -struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} socket_data_upload_event_queue SEC(".maps"); +DATA_QUEUE(socket_data_upload_queue, 1024 * 1024); struct socket_data_sequence_t { __u64 data_id; @@ -106,23 +105,40 @@ static __always_inline struct upload_data_args* generate_socket_upload_args() { return bpf_map_lookup_elem(&socket_data_upload_args_per_cpu_map, &kZero); } -static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk, struct socket_data_upload_event *event) { - if (size <= 0) { +static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk, struct upload_data_args *args) { + struct socket_data_upload_event *socket_data_event; + socket_data_event = rover_reserve_buf(&socket_data_upload_queue, sizeof(*socket_data_event)); + if (socket_data_event == NULL) { return; } - if (size > sizeof(event->buffer)) { - size = sizeof(event->buffer); + + if (size > sizeof(socket_data_event->buffer)) { + size = sizeof(socket_data_event->buffer); + } + if (size <= 0) { + rover_discard_buf(socket_data_event); + return; } - event->sequence = index; - event->data_len = size; - event->finished = is_finished; - event->have_reduce_after_chunk = have_reduce_after_chunk; - bpf_probe_read(&event->buffer, size, buf); - bpf_perf_event_output(ctx, &socket_data_upload_event_queue, BPF_F_CURRENT_CPU, event, sizeof(*event)); + // basic data + socket_data_event->start_time = args->start_time; + socket_data_event->end_time = args->end_time; + socket_data_event->protocol = args->connection_protocol; + socket_data_event->direction = args->data_direction; + socket_data_event->conid = args->con_id; + socket_data_event->randomid = args->random_id; + socket_data_event->total_size = args->bytes_count; + socket_data_event->data_id = args->socket_data_id; + + socket_data_event->sequence = index; + socket_data_event->data_len = size; + socket_data_event->finished = is_finished; + socket_data_event->have_reduce_after_chunk = have_reduce_after_chunk; + bpf_probe_read(&socket_data_event->buffer, size, buf); + rover_submit_buf(ctx, &socket_data_upload_queue, socket_data_event, sizeof(*socket_data_event)); } -static __always_inline void upload_socket_data_buf(void *ctx, char* buf, ssize_t size, struct socket_data_upload_event *event, __u8 force_unfinished) { +static __always_inline void upload_socket_data_buf(void *ctx, char* buf, ssize_t size, struct upload_data_args *args, __u8 force_unfinished) { ssize_t already_send = 0; #pragma unroll for (__u8 index = 0; index < SOCKET_UPLOAD_CHUNK_LIMIT; index++) { @@ -141,9 +157,9 @@ static __always_inline void upload_socket_data_buf(void *ctx, char* buf, ssize_t __u8 sequence = index; if (force_unfinished == 1 && need_send_in_chunk > 0) { is_finished = 0; - sequence = generate_socket_sequence(event->conid, event->data_id); + sequence = generate_socket_sequence(args->con_id, args->socket_data_id); } - __upload_socket_data_with_buffer(ctx, sequence, buf + already_send, need_send_in_chunk, is_finished, have_reduce_after_chunk, event); + __upload_socket_data_with_buffer(ctx, sequence, buf + already_send, need_send_in_chunk, is_finished, have_reduce_after_chunk, args); already_send += need_send_in_chunk; } @@ -170,12 +186,12 @@ if (iov_index < iovlen) { \ have_reduce_after_chunk = 1; \ } \ __u32 is_finished = (need_send_in_chunk + already_send) >= size || loop_count == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false; \ - __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk, event); \ + __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk, args); \ already_send += need_send_in_chunk; \ loop_count++; \ } -static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* iov, const size_t iovlen, ssize_t size, struct socket_data_upload_event *event) { +static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* iov, const size_t iovlen, ssize_t size, struct upload_data_args *args) { ssize_t already_send = 0; ssize_t cur_iov_sended = 0; __u8 iov_index = 0; @@ -198,26 +214,9 @@ static __inline void upload_socket_data(void *ctx, struct upload_data_args *args if (args->connection_protocol == CONNECTION_PROTOCOL_UNKNOWN || args->connection_ssl != args->socket_data_ssl || args->connection_skip_data_upload == 1) { return; } - // generate event - __u32 kZero = 0; - struct socket_data_upload_event *event = bpf_map_lookup_elem(&socket_data_upload_event_per_cpu_map, &kZero); - if (event == NULL) { - return; - } - - // basic data - event->start_time = args->start_time; - event->end_time = args->end_time; - event->protocol = args->connection_protocol; - event->direction = args->data_direction; - event->conid = args->con_id; - event->randomid = args->random_id; - event->total_size = args->bytes_count; - event->data_id = args->socket_data_id; - if (args->socket_data_buf != NULL) { - upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count, event, args->socket_ssl_buffer_force_unfinished); + upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count, args, args->socket_ssl_buffer_force_unfinished); } else if (args->socket_data_iovec != NULL) { - upload_socket_data_iov(ctx, args->socket_data_iovec, args->socket_data_iovlen, args->bytes_count, event); + upload_socket_data_iov(ctx, args->socket_data_iovec, args->socket_data_iovlen, args->bytes_count, args); } } \ No newline at end of file diff --git a/pkg/accesslog/bpf/loader.go b/pkg/accesslog/bpf/loader.go index 5a4f827..5058913 100644 --- a/pkg/accesslog/bpf/loader.go +++ b/pkg/accesslog/bpf/loader.go @@ -34,7 +34,7 @@ type Loader struct { func NewLoader() (*Loader, error) { objs := bpfObjects{} - if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed()); err != nil { + if err := btf.LoadBPFAndAssign(loadBpf, &objs); err != nil { return nil, err } diff --git a/pkg/accesslog/collector/protocols/queue.go b/pkg/accesslog/collector/protocols/queue.go index 68708fc..7e93dce 100644 --- a/pkg/accesslog/collector/protocols/queue.go +++ b/pkg/accesslog/collector/protocols/queue.go @@ -92,13 +92,13 @@ func (q *AnalyzeQueue) Start(ctx context.Context) { func(num int) btf.PartitionContext { return NewPartitionContext(q.context, num, q.supportAnalyzers(q.context)) }) - q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue, int(q.perCPUBuffer), + q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetail, int(q.perCPUBuffer), q.context.Config.ProtocolAnalyze.ParseParallels, func() interface{} { return q.detailSupplier() }, func(data interface{}) string { return fmt.Sprintf("%d", data.(events.SocketDetail).GetConnectionID()) }) - q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue, int(q.perCPUBuffer), + q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue, int(q.perCPUBuffer), q.context.Config.ProtocolAnalyze.ParseParallels, func() interface{} { return &events.SocketDataUploadEvent{} }, func(data interface{}) string { diff --git a/pkg/profiling/continuous/checker/bpf/network/network.go b/pkg/profiling/continuous/checker/bpf/network/network.go index ba03f21..67bb39b 100644 --- a/pkg/profiling/continuous/checker/bpf/network/network.go +++ b/pkg/profiling/continuous/checker/bpf/network/network.go @@ -134,9 +134,10 @@ func startBPFIfNeed() error { } bpf = &bpfObjects{} - if err := loadBpfObjects(bpf, btf.GetEBPFCollectionOptionsIfNeed()); err != nil { + if err := btf.LoadBPFAndAssign(loadBpf, &bpf); err != nil { return err } + bpfLinker = btf.NewLinker() bpfLinker.AddLink(link.Kprobe, map[string]*ebpf.Program{"tcp_sendmsg": bpf.TcpSendmsg}) bpfLinker.AddLink(link.Kprobe, map[string]*ebpf.Program{"tcp_recvmsg": bpf.TcpRecvmsg}) diff --git a/pkg/profiling/task/network/bpf/bpf.go b/pkg/profiling/task/network/bpf/bpf.go index 083935c..67a875f 100644 --- a/pkg/profiling/task/network/bpf/bpf.go +++ b/pkg/profiling/task/network/bpf/bpf.go @@ -34,7 +34,7 @@ type Loader struct { func NewLoader() (*Loader, error) { objs := bpfObjects{} - if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed()); err != nil { + if err := btf.LoadBPFAndAssign(loadBpf, &objs); err != nil { return nil, err } diff --git a/pkg/profiling/task/offcpu/runner.go b/pkg/profiling/task/offcpu/runner.go index 492d996..8ce3ee6 100644 --- a/pkg/profiling/task/offcpu/runner.go +++ b/pkg/profiling/task/offcpu/runner.go @@ -113,7 +113,7 @@ func (r *Runner) Run(ctx context.Context, notify base.ProfilingRunningSuccessNot if !replacedPid { return fmt.Errorf("replace the monitor pid failure") } - if err1 := spec.LoadAndAssign(&objs, btf.GetEBPFCollectionOptionsIfNeed()); err1 != nil { + if err1 := spec.LoadAndAssign(&objs, btf.GetEBPFCollectionOptionsIfNeed(spec)); err1 != nil { return err1 } r.bpf = &objs diff --git a/pkg/tools/btf/ebpf.go b/pkg/tools/btf/ebpf.go index 71e20f3..307f29f 100644 --- a/pkg/tools/btf/ebpf.go +++ b/pkg/tools/btf/ebpf.go @@ -20,6 +20,7 @@ package btf import ( "bytes" "embed" + "errors" "fmt" "path/filepath" "sync" @@ -41,7 +42,16 @@ var ( log = logger.GetLogger("tools", "btf") ) -func GetEBPFCollectionOptionsIfNeed() *ebpf.CollectionOptions { +func LoadBPFAndAssign(loadBPF func() (*ebpf.CollectionSpec, error), objs interface{}) error { + bpf, err := loadBPF() + if err != nil { + return err + } + + return bpf.LoadAndAssign(objs, GetEBPFCollectionOptionsIfNeed(bpf)) +} + +func GetEBPFCollectionOptionsIfNeed(bpfSpec *ebpf.CollectionSpec) *ebpf.CollectionOptions { findBTFOnce.Do(func() { readSpec, err := getKernelBTFAddress() if err != nil { @@ -52,6 +62,7 @@ func GetEBPFCollectionOptionsIfNeed() *ebpf.CollectionOptions { spec = readSpec }) + enhanceDataQueueOpts(bpfSpec) return &ebpf.CollectionOptions{Programs: ebpf.ProgramOptions{KernelTypes: spec}} } @@ -87,3 +98,31 @@ func getKernelBTFAddress() (spec *btf.Spec, err error) { func asset(file string) ([]byte, error) { return assets.ReadFile(filepath.ToSlash(file)) } + +func validateGlobalConstVoidPtrVar(t btf.Type) error { + btfVar, ok := t.(*btf.Var) + if !ok { + return errors.New("not of type btf.Var") + } + + if btfVar.Linkage != btf.GlobalVar { + return fmt.Errorf("%q is not a global variable", btfVar.Name) + } + + btfPtr, ok := btfVar.Type.(*btf.Pointer) + if !ok { + return fmt.Errorf("%q is not a pointer", btfVar.Name) + } + + btfConst, ok := btfPtr.Target.(*btf.Const) + if !ok { + return fmt.Errorf("%q is not const", btfVar.Name) + } + + _, ok = btfConst.Type.(*btf.Void) + if !ok { + return fmt.Errorf("%q is not a const void pointer", btfVar.Name) + } + + return nil +} diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go index f663737..9755d22 100644 --- a/pkg/tools/btf/linker.go +++ b/pkg/tools/btf/linker.go @@ -161,7 +161,7 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader, data func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader RingBufferReader, perCPUBuffer, parallels int, dataSupplier func() interface{}) { - rd, err := perf.NewReader(emap, perCPUBuffer) + rd, err := newQueueReader(emap, perCPUBuffer) if err != nil { m.errors = multierror.Append(m.errors, fmt.Errorf("open ring buffer error: %v", err)) return @@ -177,10 +177,10 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader RingBuff } } -func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, dataSupplier func() interface{}, bufReader RingBufferReader) { +func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, dataSupplier func() interface{}, bufReader RingBufferReader) { go func() { for { - record, err := rd.Read() + sample, err := rd.Read() if err != nil { if errors.Is(err, perf.ErrClosed) { return @@ -189,22 +189,17 @@ func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, dataSupplier fu continue } - if record.LostSamples != 0 { - log.Warnf("perf event queue(%s) full, dropped %d samples", emap.String(), record.LostSamples) - continue - } - data := dataSupplier() if r, ok := data.(reader.EventReader); ok { - sampleReader := reader.NewReader(record.RawSample) + sampleReader := reader.NewReader(sample) r.ReadFrom(sampleReader) if readErr := sampleReader.HasError(); readErr != nil { - log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err) + log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(sample), err) continue } } else { - if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, data); err != nil { - log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err) + if err := binary.Read(bytes.NewBuffer(sample), binary.LittleEndian, data); err != nil { + log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(sample), err) continue } } diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go index 813979d..ca7c330 100644 --- a/pkg/tools/btf/queue.go +++ b/pkg/tools/btf/queue.go @@ -19,12 +19,134 @@ package btf import ( "context" + "fmt" "hash/fnv" + "os" + "strings" "sync" "github.com/cilium/ebpf" + "github.com/cilium/ebpf/perf" + "github.com/cilium/ebpf/ringbuf" ) +const dataQueuePrefix = "rover_data_queue_" + +var ( + ringbufChecker sync.Once + ringbufAvailable bool +) + +func isRingbufAvailable() bool { + ringbufChecker.Do(func() { + ringbuf, err := ebpf.NewMap(&ebpf.MapSpec{ + Type: ebpf.RingBuf, + MaxEntries: uint32(os.Getpagesize()), + }) + + ringbuf.Close() + + ringbufAvailable = err == nil + + if ringbufAvailable { + log.Infof("detect the ring buffer is available in current system for enhancement of data queue") + } + }) + + return ringbufAvailable +} + +func enhanceDataQueueOpts(bpfSpec *ebpf.CollectionSpec) { + it := bpfSpec.Types.Iterate() + for it.Next() { + if !strings.HasPrefix(it.Type.TypeName(), dataQueuePrefix) { + continue + } + if err := validateGlobalConstVoidPtrVar(it.Type); err != nil { + panic(fmt.Errorf("invalid global const void ptr var %s: %v", it.Type.TypeName(), err)) + } + + // if the ringbuf not available, use perf event array + if !isRingbufAvailable() { + mapName := strings.TrimPrefix(it.Type.TypeName(), dataQueuePrefix) + mapSpec := bpfSpec.Maps[mapName] + mapSpec.Type = ebpf.PerfEventArray + mapSpec.KeySize = 4 + mapSpec.ValueSize = 4 + } + } +} + +type queueReader interface { + Read() ([]byte, error) + Close() error +} + +func newQueueReader(emap *ebpf.Map, perCPUBuffer int) (queueReader, error) { + switch emap.Type() { + case ebpf.RingBuf: + return newRingBufReader(emap) + case ebpf.PerfEventArray: + return newPerfQueueReader(emap, perCPUBuffer) + } + return nil, fmt.Errorf("unsupported map type: %s", emap.Type().String()) +} + +type perfQueueReader struct { + name string + reader *perf.Reader +} + +func newPerfQueueReader(emap *ebpf.Map, perCPUBuffer int) (*perfQueueReader, error) { + reader, err := perf.NewReader(emap, perCPUBuffer) + if err != nil { + return nil, err + } + return &perfQueueReader{reader: reader, name: emap.String()}, nil +} + +func (p *perfQueueReader) Read() ([]byte, error) { + read, err := p.reader.Read() + if err != nil { + return nil, err + } + + if read.LostSamples != 0 { + log.Warnf("perf event queue(%s) full, dropped %d samples", p.name, read.LostSamples) + return nil, nil + } + + return read.RawSample, nil +} + +func (p *perfQueueReader) Close() error { + return p.reader.Close() +} + +type ringBufReader struct { + reader *ringbuf.Reader +} + +func newRingBufReader(emap *ebpf.Map) (*ringBufReader, error) { + reader, err := ringbuf.NewReader(emap) + if err != nil { + return nil, err + } + return &ringBufReader{reader: reader}, nil +} + +func (r *ringBufReader) Read() ([]byte, error) { + read, err := r.reader.Read() + if err != nil { + return nil, err + } + return read.RawSample, nil +} + +func (r *ringBufReader) Close() error { + return r.reader.Close() +} + type PartitionContext interface { Start(ctx context.Context) Consume(data interface{})
