darren2013 commented on issue #506: URL: https://github.com/apache/arrow-go/issues/506#issuecomment-3294450147
> Can you share the rest of the relevant code for the object? Such as where the `arrowWriter` is created and the `forceRowGroupFlush` method? The extra messsage: 1. the entrypoint method is flushBatch,the model.event represent a log recored,contains 909 fields.The record size is 1.7k on average 2. There are 4 gouritines,every gouritine has a ParquetProcessor instance 3. pqarrow.FileWriter has a long lifecycle,until the program exit or terminate it manually 4. I tried every 500000 records,close the filewirter,and recreate a new filewrite,the memory will release significantly.In this way,will create many small files,and the important,I am not sure when the memory is full,may be 300000 or 1000000.Ideally, The memory is released,when the batch writing finished 5.the attachment is pprof file,you can analyze by execute "go tool pprof -http=:8080 pprof.log_fusion.alloc_objects.alloc_space.inuse_objects.inuse_space.012.pb.gz" Thank you for your help! [pprof.log_fusion.alloc_objects.alloc_space.inuse_objects.inuse_space.012.pb.gz](https://github.com/user-attachments/files/22353503/pprof.log_fusion.alloc_objects.alloc_space.inuse_objects.inuse_space.012.pb.gz) `package parquet import ( "encoding/json" "fmt" "log" "os" "path/filepath" "sync/atomic" "time" "github.com/darren2013/log_fusion_core/pkg/common" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/parquet" "github.com/apache/arrow-go/v18/parquet/compress" "github.com/apache/arrow-go/v18/parquet/pqarrow" "github.com/apache/arrow-go/v18/parquet/schema" "github.com/darren2013/log_fusion_core/pkg/api" "github.com/darren2013/log_fusion_core/pkg/model" ) // 全局实例计数器,用于生成唯一的实例ID var instanceCounter int64 // ParquetProcessor 实现日志数据到Parquet文件的处理 type ParquetProcessor struct { Counter *api.Counter TotalCounter *api.Counter FailCounter *api.Counter config *ParquetConfig instanceID int64 // 实例唯一ID batchSize int dictionary *model.Dictionary // 字典对象,包含字段定义和类型 currentDay string // 当前日期,格式 YYYYMMDD dataDir string // 数据存储基础目录 filePath string // 当前正在写入的文件路径 // Arrow相关 mem memory.Allocator // Arrow内存分配器 arrowSchema *arrow.Schema // Arrow Schema recordBuilder *array.RecordBuilder // Arrow记录构建器 // Parquet相关 arrowWriter *pqarrow.FileWriter // Arrow Parquet文件写入器 currentFile *os.File // 当前打开的文件 parquetSchema *schema.GroupNode // Parquet Schema (保持向后兼容) writerProps *parquet.WriterProperties // 写入器属性 // 注意:移除了内存监控相关字段,因为内存泄漏的根本原因 // 在于 Parquet 库内部的 C++ 内存管理 } // ParquetConfig 保存Parquet处理器的配置信息 type ParquetConfig struct { DictionaryPath string `json:"dictionary_path"` // 字典文件路径 DataDir string `json:"data_dir"` // 数据存储基础目录 BatchSize int `json:"batch_size"` // 批处理缓冲区大小 Compression string `json:"compression"` // 压缩算法, 可用选项: SNAPPY, GZIP, ZSTD } // NewParquetProcessor 创建一个新的Parquet处理器 func NewParquetProcessor(counter, totalCounter, failCounter *api.Counter) (*ParquetProcessor, error) { // 生成唯一的实例ID instanceID := atomic.AddInt64(&instanceCounter, 1) // 创建处理器实例 processor := &ParquetProcessor{ Counter: counter, TotalCounter: totalCounter, FailCounter: failCounter, config: &ParquetConfig{}, instanceID: instanceID, batchSize: 1000, // 默认值,会被UpdateConfig覆盖 dataDir: "./data", // 默认值,会被UpdateConfig覆盖 mem: memory.NewGoAllocator(), // 初始化Arrow内存分配器 } log.Printf("创建ParquetProcessor实例,ID: %d", instanceID) return processor, nil } // Process 处理单个事件 func (p *ParquetProcessor) Process(entity model.Event) { // 直接调用 BatchProcess 方法处理单个事件 p.BatchProcess([]model.Event{entity}) } // BatchProcess 批量处理事件 func (p *ParquetProcessor) BatchProcess(entities []model.Event) { // 注意:移除了内存监控,因为内存泄漏的根本原因 // 在于 Parquet 库内部的 C++ 内存管理 // 检查是否需要旋转到新的日期 day := time.Now().Format("20060102") // 检查是否需要切换到新的一天 if p.currentDay != day { if err := p.rotateToNewDay(day); err != nil { log.Printf("ParquetProcessor[%d] Error rotating to new day: %v", p.instanceID, err) (*p.FailCounter).Count() return } } // 如果arrowWriter为nil,需要重新初始化 if p.arrowWriter == nil { if err := p.initializeParquetFile(); err != nil { log.Printf("ParquetProcessor[%d] Error initializing Parquet file: %v", p.instanceID, err) (*p.FailCounter).Count() return } } // 实现真正的批处理逻辑 if len(entities) == 0 { return } // 一次性写入所有数据 if err := p.flushBatch(entities); err != nil { log.Printf("ParquetProcessor[%d] Error flushing batch: %v", p.instanceID, err) (*p.FailCounter).Count() return } // 更新计数器 (*p.Counter).CountN(int64(len(entities))) } // GetName 返回处理器名称 func (p *ParquetProcessor) GetName() string { return "parquet-processor" } // Start 启动处理器 func (p *ParquetProcessor) Start() error { return nil } // rotateToNewDay 旋转到新的一天,包括刷新批次、关闭当前文件、初始化新文件 func (p *ParquetProcessor) rotateToNewDay(day string) error { // 在旋转之前不需要刷新,因为没有批处理缓冲区 // 关闭当前的Arrow Parquet文件 if p.arrowWriter != nil { if err := p.arrowWriter.Close(); err != nil { log.Printf("ParquetProcessor[%d] Error closing Arrow Parquet writer: %v", p.instanceID, err) } p.arrowWriter = nil } if p.currentFile != nil { if err := p.currentFile.Close(); err != nil { log.Printf("ParquetProcessor[%d] Error closing file: %v", p.instanceID, err) } p.currentFile = nil } // 使用新的日期初始化 p.currentDay = day if err := p.initializeParquetFile(); err != nil { log.Printf("ParquetProcessor[%d] Error initializing Parquet file for new day: %v", p.instanceID, err) (*p.FailCounter).Count() return err } return nil } // initializeParquetFile 初始化当前日期的Parquet文件 func (p *ParquetProcessor) initializeParquetFile() error { p.currentDay = time.Now().Format("20060102") // 为当前日期创建目录 dayDir := filepath.Join(p.dataDir, p.currentDay) if err := os.MkdirAll(dayDir, 0755); err != nil { return fmt.Errorf("failed to create directory: %w", err) } // 创建具有时间戳和实例ID的文件名,确保唯一性 timestamp := time.Now().Format("150405") fileName := fmt.Sprintf("logs_%s_inst%d.parquet", timestamp, p.instanceID) filePath := filepath.Join(dayDir, fileName) // 记录文件路径 p.filePath = filePath // 创建Arrow和Parquet schema if p.arrowSchema == nil { // 构建Arrow schema p.arrowSchema = buildArrowSchema(p.dictionary) // 构建Parquet schema(保持向后兼容) p.parquetSchema = buildParquetSchema(p.dictionary) } // 创建用于写入的文件 f, err := os.Create(filePath) if err != nil { return fmt.Errorf("failed to create file: %w", err) } p.currentFile = f // 设置Writer属性 writerProps := parquet.NewWriterProperties( parquet.WithCompression(getCompressionType(p.config.Compression)), parquet.WithVersion(parquet.V2_LATEST), ) p.writerProps = writerProps // 创建Arrow Parquet文件写入器 arrowWriter, err := pqarrow.NewFileWriter(p.arrowSchema, f, writerProps, pqarrow.DefaultWriterProps()) if err != nil { return fmt.Errorf("failed to create arrow parquet writer: %w", err) } p.arrowWriter = arrowWriter log.Printf("ParquetProcessor[%d] Created new Parquet file at %s with compression %s", p.instanceID, filePath, p.config.Compression) return nil } // buildArrowSchema 构建Arrow Schema func buildArrowSchema(dictionary *model.Dictionary) *arrow.Schema { // 记录字段名和对应的字段类型,保持一致的顺序 fieldNames := []string{} fieldTypes := make(map[string]string) // 添加timestamp字段(始终是第一个字段) fieldNames = append(fieldNames, "timestamp") fieldTypes["timestamp"] = "timestamp" // 获取字段列表并按照名称进行排序,确保顺序稳定 remainingFields := make([]string, 0, len(dictionary.FieldTypes)) for fieldName := range dictionary.FieldTypes { if fieldName != "timestamp" { // 跳过timestamp,已经添加过了 remainingFields = append(remainingFields, fieldName) } } // 确保字段顺序稳定,这里可以按照字段名称排序,或者按照字典中的顺序添加 // 这里简单地按照ID排序 for _, item := range dictionary.Items { if item.WordEnName != "timestamp" && item.Enable { // 只有没有添加过的字段才添加(避免重复) isAdded := false for _, name := range fieldNames { if name == item.WordEnName { isAdded = true break } } if !isAdded { fieldNames = append(fieldNames, item.WordEnName) fieldTypes[item.WordEnName] = item.WordType } } } // 记录所有字段和对应的类型方便调试 log.Printf("构建 Arrow Schema,字段数量: %d", len(fieldNames)) // 按照记录的顺序创建字段 fields := make([]arrow.Field, 0, len(fieldNames)) // 遍历字段名列表,按顺序创建字段 for i, fieldName := range fieldNames { fieldType := fieldTypes[fieldName] var arrowType arrow.DataType // 映射字段类型到Arrow类型 switch fieldType { case "string": arrowType = arrow.BinaryTypes.String case "int", "integer": arrowType = arrow.PrimitiveTypes.Int32 case "float", "double": arrowType = arrow.PrimitiveTypes.Float64 case "bool", "boolean": arrowType = arrow.FixedWidthTypes.Boolean case "timestamp": arrowType = arrow.PrimitiveTypes.Int64 default: // 对于未知类型,默认为字符串 arrowType = arrow.BinaryTypes.String } // 创建Arrow字段 field := arrow.Field{ Name: fieldName, Type: arrowType, Nullable: fieldName != "timestamp" || i != 0, // timestamp字段作为第一个字段时不可为空 } fields = append(fields, field) } // 创建Arrow Schema schema := arrow.NewSchema(fields, nil) log.Printf("Arrow Schema 构建完成, 总字段数: %d", len(fields)) return schema } // buildParquetSchema 构建Parquet Schema(保持向后兼容) func buildParquetSchema(dictionary *model.Dictionary) *schema.GroupNode { // 使用Arrow Schema构建Parquet Schema arrowSchema := buildArrowSchema(dictionary) // 将Arrow Schema转换为Parquet Schema fields := make(schema.FieldList, 0, len(arrowSchema.Fields())) for _, field := range arrowSchema.Fields() { var parquetField schema.Node // 映射Arrow类型到Parquet类型 switch field.Type.ID() { case arrow.STRING: repetition := parquet.Repetitions.Optional if !field.Nullable { repetition = parquet.Repetitions.Required } parquetField = schema.NewByteArrayNode(field.Name, repetition, -1) case arrow.INT32: repetition := parquet.Repetitions.Optional if !field.Nullable { repetition = parquet.Repetitions.Required } parquetField = schema.NewInt32Node(field.Name, repetition, -1) case arrow.INT64: repetition := parquet.Repetitions.Optional if !field.Nullable { repetition = parquet.Repetitions.Required } parquetField = schema.NewInt64Node(field.Name, repetition, -1) case arrow.FLOAT64: repetition := parquet.Repetitions.Optional if !field.Nullable { repetition = parquet.Repetitions.Required } parquetField = schema.NewFloat64Node(field.Name, repetition, -1) case arrow.BOOL: repetition := parquet.Repetitions.Optional if !field.Nullable { repetition = parquet.Repetitions.Required } parquetField = schema.NewBooleanNode(field.Name, repetition, -1) default: // 对于未知类型,默认为字符串 repetition := parquet.Repetitions.Optional if !field.Nullable { repetition = parquet.Repetitions.Required } parquetField = schema.NewByteArrayNode(field.Name, repetition, -1) } fields = append(fields, parquetField) } // 创建根节点 node, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, fields, -1) if err != nil { log.Fatalf("构建 Parquet Schema 失败: %v", err) } log.Printf("Parquet Schema 构建完成, 总字段数: %d", len(fields)) return node } // 注意:移除了 checkMemoryUsage 方法 // 因为内存泄漏的根本原因在于 Parquet 库内部的 C++ 内存管理 // 所以内存监控不是有效的解决方案 // flushBatch 使用Arrow Go v18最新的方式将指定批次写入Parquet文件 func (p *ParquetProcessor) flushBatch(entities []model.Event) error { if len(entities) == 0 { return nil } // 检查Arrow Schema是否已初始化 if p.arrowSchema == nil { return fmt.Errorf("arrow schema not initialized") } // 检查Arrow写入器是否已初始化 if p.arrowWriter == nil { return fmt.Errorf("arrow writer not initialized") } // 当前时间 now := time.Now().UnixMilli() // 创建Arrow Record构建器 if p.recordBuilder == nil { p.recordBuilder = array.NewRecordBuilder(p.mem, p.arrowSchema) } // 清理之前的构建器状态 p.recordBuilder.Release() p.recordBuilder = array.NewRecordBuilder(p.mem, p.arrowSchema) // 按字段填充数据 for i, field := range p.arrowSchema.Fields() { fieldName := field.Name // timestamp字段特殊处理 if fieldName == "timestamp" { builder := p.recordBuilder.Field(i).(*array.Int64Builder) for j := 0; j < len(entities); j++ { builder.Append(now) } continue } // 根据字段类型填充数据 switch field.Type.ID() { case arrow.STRING: builder := p.recordBuilder.Field(i).(*array.StringBuilder) for _, entity := range entities { val, ok := entity.Values[fieldName] if !ok || val == nil { builder.AppendNull() } else { var strVal string switch v := val.(type) { case string: strVal = v default: strVal = fmt.Sprintf("%v", v) } builder.Append(strVal) } } case arrow.INT32: builder := p.recordBuilder.Field(i).(*array.Int32Builder) for _, entity := range entities { val, ok := entity.Values[fieldName] if !ok || val == nil { builder.AppendNull() } else { var intVal int32 switch v := val.(type) { case int: intVal = int32(v) case int64: intVal = int32(v) case float64: intVal = int32(v) case string: var f float64 if _, err := fmt.Sscanf(v, "%f", &f); err == nil { intVal = int32(f) } } builder.Append(intVal) } } case arrow.INT64: builder := p.recordBuilder.Field(i).(*array.Int64Builder) for _, entity := range entities { val, ok := entity.Values[fieldName] if !ok || val == nil { builder.AppendNull() } else { var int64Val int64 switch v := val.(type) { case int: int64Val = int64(v) case int64: int64Val = v case float64: int64Val = int64(v) case time.Time: int64Val = v.UnixMilli() case string: var f float64 if _, err := fmt.Sscanf(v, "%f", &f); err == nil { int64Val = int64(f) } else { // 尝试解析时间字符串 formats := []string{ time.RFC3339, "2006-01-02 15:04:05", "2006/01/02 15:04:05", "01/02/2006 15:04:05", "02/01/2006 15:04:05", } for _, format := range formats { if t, err := time.Parse(format, v); err == nil { int64Val = t.UnixMilli() break } } } } builder.Append(int64Val) } } case arrow.FLOAT64: builder := p.recordBuilder.Field(i).(*array.Float64Builder) for _, entity := range entities { val, ok := entity.Values[fieldName] if !ok || val == nil { builder.AppendNull() } else { var floatVal float64 switch v := val.(type) { case float64: floatVal = v case float32: floatVal = float64(v) case int: floatVal = float64(v) case int64: floatVal = float64(v) case string: fmt.Sscanf(v, "%f", &floatVal) } builder.Append(floatVal) } } case arrow.BOOL: builder := p.recordBuilder.Field(i).(*array.BooleanBuilder) for _, entity := range entities { val, ok := entity.Values[fieldName] if !ok || val == nil { builder.AppendNull() } else { var boolVal bool switch v := val.(type) { case bool: boolVal = v case string: switch v { case "true", "True", "TRUE", "1": boolVal = true case "false", "False", "FALSE", "0": boolVal = false } case int: boolVal = v != 0 case float64: boolVal = v != 0 } builder.Append(boolVal) } } default: log.Printf("警告: 字段 %s 的类型 %s 不支持", fieldName, field.Type.ID()) } } // 构建Arrow RecordBatch recordBatch := p.recordBuilder.NewRecordBatch() defer recordBatch.Release() // 使用Arrow Go v18最新的方式写入Parquet文件 if err := p.arrowWriter.Write(recordBatch); err != nil { return fmt.Errorf("failed to write record batch to parquet: %w", err) } // 调用TotalCounter.CountN方法,记录成功写入的记录数 (*p.TotalCounter).CountN(int64(len(entities))) return nil } // getCompressionType 将压缩算法名称转换为对应的Parquet压缩类型 func getCompressionType(compression string) compress.Compression { switch compression { case "SNAPPY": return compress.Codecs.Snappy case "GZIP": return compress.Codecs.Gzip case "ZSTD": return compress.Codecs.Zstd default: // 默认SNAPPY压缩 return compress.Codecs.Snappy } } // Close 完成处理器的工作,刷新所有挂起的数据 func (p *ParquetProcessor) Close() error { var closeErrors []error // 释放Arrow Record构建器 if p.recordBuilder != nil { p.recordBuilder.Release() p.recordBuilder = nil } // 关闭Arrow Parquet写入器 if p.arrowWriter != nil { log.Printf("ParquetProcessor[%d] 正在关闭Arrow Parquet写入器...当前文件: %s", p.instanceID, p.filePath) if err := p.arrowWriter.Close(); err != nil { log.Printf("ParquetProcessor[%d] Error closing Arrow Parquet writer: %v", p.instanceID, err) closeErrors = append(closeErrors, fmt.Errorf("failed to close arrow parquet writer: %w", err)) } p.arrowWriter = nil } // 关闭文件 if p.currentFile != nil { log.Printf("ParquetProcessor[%d] 正在关闭文件: %s", p.instanceID, p.filePath) if err := p.currentFile.Close(); err != nil { log.Printf("ParquetProcessor[%d] Error closing file: %v", p.instanceID, err) closeErrors = append(closeErrors, fmt.Errorf("failed to close file: %w", err)) } p.currentFile = nil } // 释放Arrow Schema引用 if p.arrowSchema != nil { p.arrowSchema = nil } // 释放Parquet Schema引用 if p.parquetSchema != nil { p.parquetSchema = nil } // 释放Writer Properties引用 if p.writerProps != nil { p.writerProps = nil } if len(closeErrors) > 0 { log.Printf("ParquetProcessor[%d] 关闭时遇到 %d 个错误", p.instanceID, len(closeErrors)) return fmt.Errorf("close errors: %v", closeErrors) } log.Printf("ParquetProcessor[%d] 已完全关闭,文件: %s", p.instanceID, p.filePath) return nil } // UpdateConfig 更新处理器配置 func (p *ParquetProcessor) UpdateConfig(configData map[string]interface{}) error { // 解析配置 configBytes, err := json.Marshal(configData) if err != nil { return fmt.Errorf("failed to marshal config: %w", err) } var newConfig ParquetConfig if err := json.Unmarshal(configBytes, &newConfig); err != nil { return fmt.Errorf("failed to unmarshal config: %w", err) } // 不需要刷新,因为没有批处理缓冲区 // 仅在首次配置时需要初始化 needReinitialization := p.dictionary == nil if needReinitialization { // 关闭现有的资源 if p.arrowWriter != nil { if err := p.arrowWriter.Close(); err != nil { log.Printf("ParquetProcessor[%d] Error closing Arrow Parquet writer during config update: %v", p.instanceID, err) } p.arrowWriter = nil } if p.currentFile != nil { if err := p.currentFile.Close(); err != nil { log.Printf("ParquetProcessor[%d] Error closing file during config update: %v", p.instanceID, err) } p.currentFile = nil } // 更新数据目录 if newConfig.DataDir != "" { p.dataDir = newConfig.DataDir } // 加载字典 if p.dictionary == nil { dictionary, err := common.GetDic(newConfig.DictionaryPath) if err != nil { return fmt.Errorf("failed to load dictionary from %s: %w", newConfig.DictionaryPath, err) } p.dictionary = dictionary // 重新构建 Arrow 和 Parquet schema p.arrowSchema = buildArrowSchema(p.dictionary) p.parquetSchema = buildParquetSchema(p.dictionary) } } // 更新批处理大小(虽然不再使用批处理,但保留配置) if newConfig.BatchSize > 0 { p.batchSize = newConfig.BatchSize } // 更新压缩算法 if newConfig.Compression != "" { p.config.Compression = newConfig.Compression } // 更新配置对象的其他字段 if newConfig.DataDir != "" { dirPath, pathErr := common.GetDataDirPath(p.dataDir) if pathErr != nil { fmt.Println("data pathErr,use default", pathErr) } p.config.DataDir = dirPath } if newConfig.DictionaryPath != "" { p.config.DictionaryPath = newConfig.DictionaryPath } // 如果需要重新初始化 Parquet 文件 if p.arrowWriter == nil || p.currentFile == nil { if err := p.initializeParquetFile(); err != nil { return fmt.Errorf("failed to initialize Parquet file: %w", err) } } log.Printf("ParquetProcessor[%d] configuration updated successfully", p.instanceID) return nil } // ReadParquetFile 读取Parquet文件并返回事件数据 // 注意:此功能需要进一步实现,当前版本暂时注释掉以避免编译错误 /* func (p *ParquetProcessor) ReadParquetFile(filePath string) ([]model.Event, error) { // 实现读取功能需要正确的Arrow API使用 // 暂时返回空实现 return nil, fmt.Errorf("read functionality not implemented yet") } */ ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org