darren2013 commented on issue #506:
URL: https://github.com/apache/arrow-go/issues/506#issuecomment-3294450147

   > Can you share the rest of the relevant code for the object? Such as where 
the `arrowWriter` is created and the `forceRowGroupFlush` method?
   
   The extra messsage:
   1. the entrypoint method is flushBatch,the model.event represent a log 
recored,contains 909 fields.The record size is 1.7k on average
   2. There are 4 gouritines,every gouritine has a ParquetProcessor instance
   3. pqarrow.FileWriter has a long lifecycle,until the program exit or 
terminate it manually
   4. I tried every 500000 records,close the filewirter,and recreate a new 
filewrite,the memory will release significantly.In this way,will create many 
small files,and the important,I am not sure when the memory is full,may be 
300000 or 1000000.Ideally,
   The memory is released,when the batch writing finished
   5.the attachment is pprof file,you can analyze by execute "go tool pprof 
-http=:8080 
pprof.log_fusion.alloc_objects.alloc_space.inuse_objects.inuse_space.012.pb.gz"
   
   Thank you for your help!
   
   
[pprof.log_fusion.alloc_objects.alloc_space.inuse_objects.inuse_space.012.pb.gz](https://github.com/user-attachments/files/22353503/pprof.log_fusion.alloc_objects.alloc_space.inuse_objects.inuse_space.012.pb.gz)
   
   `package parquet
   
   import (
        "encoding/json"
        "fmt"
        "log"
        "os"
        "path/filepath"
        "sync/atomic"
        "time"
   
        "github.com/darren2013/log_fusion_core/pkg/common"
   
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/arrow/memory"
        "github.com/apache/arrow-go/v18/parquet"
        "github.com/apache/arrow-go/v18/parquet/compress"
        "github.com/apache/arrow-go/v18/parquet/pqarrow"
        "github.com/apache/arrow-go/v18/parquet/schema"
        "github.com/darren2013/log_fusion_core/pkg/api"
        "github.com/darren2013/log_fusion_core/pkg/model"
   )
   
   // 全局实例计数器,用于生成唯一的实例ID
   var instanceCounter int64
   
   // ParquetProcessor 实现日志数据到Parquet文件的处理
   type ParquetProcessor struct {
        Counter      *api.Counter
        TotalCounter *api.Counter
        FailCounter  *api.Counter
   
        config     *ParquetConfig
        instanceID int64 // 实例唯一ID
        batchSize  int
        dictionary *model.Dictionary // 字典对象,包含字段定义和类型
        currentDay string            // 当前日期,格式 YYYYMMDD
        dataDir    string            // 数据存储基础目录
        filePath   string            // 当前正在写入的文件路径
   
        // Arrow相关
        mem           memory.Allocator     // Arrow内存分配器
        arrowSchema   *arrow.Schema        // Arrow Schema
        recordBuilder *array.RecordBuilder // Arrow记录构建器
   
        // Parquet相关
        arrowWriter   *pqarrow.FileWriter       // Arrow Parquet文件写入器
        currentFile   *os.File                  // 当前打开的文件
        parquetSchema *schema.GroupNode         // Parquet Schema (保持向后兼容)
        writerProps   *parquet.WriterProperties // 写入器属性
   
        // 注意:移除了内存监控相关字段,因为内存泄漏的根本原因
        // 在于 Parquet 库内部的 C++ 内存管理
   }
   
   // ParquetConfig 保存Parquet处理器的配置信息
   type ParquetConfig struct {
        DictionaryPath string `json:"dictionary_path"` // 字典文件路径
        DataDir        string `json:"data_dir"`        // 数据存储基础目录
        BatchSize      int    `json:"batch_size"`      // 批处理缓冲区大小
        Compression    string `json:"compression"`     // 压缩算法, 可用选项: SNAPPY, 
GZIP, ZSTD
   }
   
   // NewParquetProcessor 创建一个新的Parquet处理器
   func NewParquetProcessor(counter, totalCounter, failCounter *api.Counter) 
(*ParquetProcessor, error) {
        // 生成唯一的实例ID
        instanceID := atomic.AddInt64(&instanceCounter, 1)
   
        // 创建处理器实例
        processor := &ParquetProcessor{
                Counter:      counter,
                TotalCounter: totalCounter,
                FailCounter:  failCounter,
                config:       &ParquetConfig{},
                instanceID:   instanceID,
                batchSize:    1000,                    // 默认值,会被UpdateConfig覆盖
                dataDir:      "./data",                // 默认值,会被UpdateConfig覆盖
                mem:          memory.NewGoAllocator(), // 初始化Arrow内存分配器
        }
   
        log.Printf("创建ParquetProcessor实例,ID: %d", instanceID)
        return processor, nil
   }
   
   // Process 处理单个事件
   func (p *ParquetProcessor) Process(entity model.Event) {
        // 直接调用 BatchProcess 方法处理单个事件
        p.BatchProcess([]model.Event{entity})
   }
   
   // BatchProcess 批量处理事件
   func (p *ParquetProcessor) BatchProcess(entities []model.Event) {
        // 注意:移除了内存监控,因为内存泄漏的根本原因
        // 在于 Parquet 库内部的 C++ 内存管理
   
        // 检查是否需要旋转到新的日期
        day := time.Now().Format("20060102")
   
        // 检查是否需要切换到新的一天
        if p.currentDay != day {
                if err := p.rotateToNewDay(day); err != nil {
                        log.Printf("ParquetProcessor[%d] Error rotating to new 
day: %v", p.instanceID, err)
                        (*p.FailCounter).Count()
                        return
                }
        }
   
        // 如果arrowWriter为nil,需要重新初始化
        if p.arrowWriter == nil {
                if err := p.initializeParquetFile(); err != nil {
                        log.Printf("ParquetProcessor[%d] Error initializing 
Parquet file: %v", p.instanceID, err)
                        (*p.FailCounter).Count()
                        return
                }
        }
   
        // 实现真正的批处理逻辑
        if len(entities) == 0 {
                return
        }
   
        // 一次性写入所有数据
        if err := p.flushBatch(entities); err != nil {
                log.Printf("ParquetProcessor[%d] Error flushing batch: %v", 
p.instanceID, err)
                (*p.FailCounter).Count()
                return
        }
   
        // 更新计数器
        (*p.Counter).CountN(int64(len(entities)))
   }
   
   // GetName 返回处理器名称
   func (p *ParquetProcessor) GetName() string {
        return "parquet-processor"
   }
   
   // Start 启动处理器
   func (p *ParquetProcessor) Start() error {
        return nil
   }
   
   // rotateToNewDay 旋转到新的一天,包括刷新批次、关闭当前文件、初始化新文件
   func (p *ParquetProcessor) rotateToNewDay(day string) error {
        // 在旋转之前不需要刷新,因为没有批处理缓冲区
   
        // 关闭当前的Arrow Parquet文件
        if p.arrowWriter != nil {
                if err := p.arrowWriter.Close(); err != nil {
                        log.Printf("ParquetProcessor[%d] Error closing Arrow 
Parquet writer: %v", p.instanceID, err)
                }
                p.arrowWriter = nil
        }
   
        if p.currentFile != nil {
                if err := p.currentFile.Close(); err != nil {
                        log.Printf("ParquetProcessor[%d] Error closing file: 
%v", p.instanceID, err)
                }
                p.currentFile = nil
        }
   
        // 使用新的日期初始化
        p.currentDay = day
        if err := p.initializeParquetFile(); err != nil {
                log.Printf("ParquetProcessor[%d] Error initializing Parquet 
file for new day: %v", p.instanceID, err)
                (*p.FailCounter).Count()
                return err
        }
   
        return nil
   }
   
   // initializeParquetFile 初始化当前日期的Parquet文件
   func (p *ParquetProcessor) initializeParquetFile() error {
        p.currentDay = time.Now().Format("20060102")
   
        // 为当前日期创建目录
        dayDir := filepath.Join(p.dataDir, p.currentDay)
        if err := os.MkdirAll(dayDir, 0755); err != nil {
                return fmt.Errorf("failed to create directory: %w", err)
        }
   
        // 创建具有时间戳和实例ID的文件名,确保唯一性
        timestamp := time.Now().Format("150405")
        fileName := fmt.Sprintf("logs_%s_inst%d.parquet", timestamp, 
p.instanceID)
        filePath := filepath.Join(dayDir, fileName)
   
        // 记录文件路径
        p.filePath = filePath
   
        // 创建Arrow和Parquet schema
        if p.arrowSchema == nil {
                // 构建Arrow schema
                p.arrowSchema = buildArrowSchema(p.dictionary)
                // 构建Parquet schema(保持向后兼容)
                p.parquetSchema = buildParquetSchema(p.dictionary)
        }
   
        // 创建用于写入的文件
        f, err := os.Create(filePath)
        if err != nil {
                return fmt.Errorf("failed to create file: %w", err)
        }
        p.currentFile = f
   
        // 设置Writer属性
        writerProps := parquet.NewWriterProperties(
                
parquet.WithCompression(getCompressionType(p.config.Compression)),
                parquet.WithVersion(parquet.V2_LATEST),
        )
        p.writerProps = writerProps
   
        // 创建Arrow Parquet文件写入器
        arrowWriter, err := pqarrow.NewFileWriter(p.arrowSchema, f, 
writerProps, pqarrow.DefaultWriterProps())
        if err != nil {
                return fmt.Errorf("failed to create arrow parquet writer: %w", 
err)
        }
        p.arrowWriter = arrowWriter
   
        log.Printf("ParquetProcessor[%d] Created new Parquet file at %s with 
compression %s", p.instanceID, filePath, p.config.Compression)
   
        return nil
   }
   
   // buildArrowSchema 构建Arrow Schema
   func buildArrowSchema(dictionary *model.Dictionary) *arrow.Schema {
        // 记录字段名和对应的字段类型,保持一致的顺序
        fieldNames := []string{}
        fieldTypes := make(map[string]string)
   
        // 添加timestamp字段(始终是第一个字段)
        fieldNames = append(fieldNames, "timestamp")
        fieldTypes["timestamp"] = "timestamp"
   
        // 获取字段列表并按照名称进行排序,确保顺序稳定
        remainingFields := make([]string, 0, len(dictionary.FieldTypes))
        for fieldName := range dictionary.FieldTypes {
                if fieldName != "timestamp" { // 跳过timestamp,已经添加过了
                        remainingFields = append(remainingFields, fieldName)
                }
        }
   
        // 确保字段顺序稳定,这里可以按照字段名称排序,或者按照字典中的顺序添加
        // 这里简单地按照ID排序
        for _, item := range dictionary.Items {
                if item.WordEnName != "timestamp" && item.Enable {
                        // 只有没有添加过的字段才添加(避免重复)
                        isAdded := false
                        for _, name := range fieldNames {
                                if name == item.WordEnName {
                                        isAdded = true
                                        break
                                }
                        }
   
                        if !isAdded {
                                fieldNames = append(fieldNames, item.WordEnName)
                                fieldTypes[item.WordEnName] = item.WordType
                        }
                }
        }
   
        // 记录所有字段和对应的类型方便调试
        log.Printf("构建 Arrow Schema,字段数量: %d", len(fieldNames))
   
        // 按照记录的顺序创建字段
        fields := make([]arrow.Field, 0, len(fieldNames))
   
        // 遍历字段名列表,按顺序创建字段
        for i, fieldName := range fieldNames {
                fieldType := fieldTypes[fieldName]
                var arrowType arrow.DataType
   
                // 映射字段类型到Arrow类型
                switch fieldType {
                case "string":
                        arrowType = arrow.BinaryTypes.String
                case "int", "integer":
                        arrowType = arrow.PrimitiveTypes.Int32
                case "float", "double":
                        arrowType = arrow.PrimitiveTypes.Float64
                case "bool", "boolean":
                        arrowType = arrow.FixedWidthTypes.Boolean
                case "timestamp":
                        arrowType = arrow.PrimitiveTypes.Int64
                default:
                        // 对于未知类型,默认为字符串
                        arrowType = arrow.BinaryTypes.String
                }
   
                // 创建Arrow字段
                field := arrow.Field{
                        Name:     fieldName,
                        Type:     arrowType,
                        Nullable: fieldName != "timestamp" || i != 0, // 
timestamp字段作为第一个字段时不可为空
                }
                fields = append(fields, field)
        }
   
        // 创建Arrow Schema
        schema := arrow.NewSchema(fields, nil)
   
        log.Printf("Arrow Schema 构建完成, 总字段数: %d", len(fields))
   
        return schema
   }
   
   // buildParquetSchema 构建Parquet Schema(保持向后兼容)
   func buildParquetSchema(dictionary *model.Dictionary) *schema.GroupNode {
        // 使用Arrow Schema构建Parquet Schema
        arrowSchema := buildArrowSchema(dictionary)
   
        // 将Arrow Schema转换为Parquet Schema
        fields := make(schema.FieldList, 0, len(arrowSchema.Fields()))
   
        for _, field := range arrowSchema.Fields() {
                var parquetField schema.Node
   
                // 映射Arrow类型到Parquet类型
                switch field.Type.ID() {
                case arrow.STRING:
                        repetition := parquet.Repetitions.Optional
                        if !field.Nullable {
                                repetition = parquet.Repetitions.Required
                        }
                        parquetField = schema.NewByteArrayNode(field.Name, 
repetition, -1)
                case arrow.INT32:
                        repetition := parquet.Repetitions.Optional
                        if !field.Nullable {
                                repetition = parquet.Repetitions.Required
                        }
                        parquetField = schema.NewInt32Node(field.Name, 
repetition, -1)
                case arrow.INT64:
                        repetition := parquet.Repetitions.Optional
                        if !field.Nullable {
                                repetition = parquet.Repetitions.Required
                        }
                        parquetField = schema.NewInt64Node(field.Name, 
repetition, -1)
                case arrow.FLOAT64:
                        repetition := parquet.Repetitions.Optional
                        if !field.Nullable {
                                repetition = parquet.Repetitions.Required
                        }
                        parquetField = schema.NewFloat64Node(field.Name, 
repetition, -1)
                case arrow.BOOL:
                        repetition := parquet.Repetitions.Optional
                        if !field.Nullable {
                                repetition = parquet.Repetitions.Required
                        }
                        parquetField = schema.NewBooleanNode(field.Name, 
repetition, -1)
                default:
                        // 对于未知类型,默认为字符串
                        repetition := parquet.Repetitions.Optional
                        if !field.Nullable {
                                repetition = parquet.Repetitions.Required
                        }
                        parquetField = schema.NewByteArrayNode(field.Name, 
repetition, -1)
                }
   
                fields = append(fields, parquetField)
        }
   
        // 创建根节点
        node, err := schema.NewGroupNode("schema", 
parquet.Repetitions.Required, fields, -1)
        if err != nil {
                log.Fatalf("构建 Parquet Schema 失败: %v", err)
        }
   
        log.Printf("Parquet Schema 构建完成, 总字段数: %d", len(fields))
   
        return node
   }
   
   // 注意:移除了 checkMemoryUsage 方法
   // 因为内存泄漏的根本原因在于 Parquet 库内部的 C++ 内存管理
   // 所以内存监控不是有效的解决方案
   
   // flushBatch 使用Arrow Go v18最新的方式将指定批次写入Parquet文件
   func (p *ParquetProcessor) flushBatch(entities []model.Event) error {
        if len(entities) == 0 {
                return nil
        }
   
        // 检查Arrow Schema是否已初始化
        if p.arrowSchema == nil {
                return fmt.Errorf("arrow schema not initialized")
        }
   
        // 检查Arrow写入器是否已初始化
        if p.arrowWriter == nil {
                return fmt.Errorf("arrow writer not initialized")
        }
   
        // 当前时间
        now := time.Now().UnixMilli()
   
        // 创建Arrow Record构建器
        if p.recordBuilder == nil {
                p.recordBuilder = array.NewRecordBuilder(p.mem, p.arrowSchema)
        }
   
        // 清理之前的构建器状态
        p.recordBuilder.Release()
        p.recordBuilder = array.NewRecordBuilder(p.mem, p.arrowSchema)
   
        // 按字段填充数据
        for i, field := range p.arrowSchema.Fields() {
                fieldName := field.Name
   
                // timestamp字段特殊处理
                if fieldName == "timestamp" {
                        builder := 
p.recordBuilder.Field(i).(*array.Int64Builder)
                        for j := 0; j < len(entities); j++ {
                                builder.Append(now)
                        }
                        continue
                }
   
                // 根据字段类型填充数据
                switch field.Type.ID() {
                case arrow.STRING:
                        builder := 
p.recordBuilder.Field(i).(*array.StringBuilder)
                        for _, entity := range entities {
                                val, ok := entity.Values[fieldName]
                                if !ok || val == nil {
                                        builder.AppendNull()
                                } else {
                                        var strVal string
                                        switch v := val.(type) {
                                        case string:
                                                strVal = v
                                        default:
                                                strVal = fmt.Sprintf("%v", v)
                                        }
                                        builder.Append(strVal)
                                }
                        }
   
                case arrow.INT32:
                        builder := 
p.recordBuilder.Field(i).(*array.Int32Builder)
                        for _, entity := range entities {
                                val, ok := entity.Values[fieldName]
                                if !ok || val == nil {
                                        builder.AppendNull()
                                } else {
                                        var intVal int32
                                        switch v := val.(type) {
                                        case int:
                                                intVal = int32(v)
                                        case int64:
                                                intVal = int32(v)
                                        case float64:
                                                intVal = int32(v)
                                        case string:
                                                var f float64
                                                if _, err := fmt.Sscanf(v, 
"%f", &f); err == nil {
                                                        intVal = int32(f)
                                                }
                                        }
                                        builder.Append(intVal)
                                }
                        }
   
                case arrow.INT64:
                        builder := 
p.recordBuilder.Field(i).(*array.Int64Builder)
                        for _, entity := range entities {
                                val, ok := entity.Values[fieldName]
                                if !ok || val == nil {
                                        builder.AppendNull()
                                } else {
                                        var int64Val int64
                                        switch v := val.(type) {
                                        case int:
                                                int64Val = int64(v)
                                        case int64:
                                                int64Val = v
                                        case float64:
                                                int64Val = int64(v)
                                        case time.Time:
                                                int64Val = v.UnixMilli()
                                        case string:
                                                var f float64
                                                if _, err := fmt.Sscanf(v, 
"%f", &f); err == nil {
                                                        int64Val = int64(f)
                                                } else {
                                                        // 尝试解析时间字符串
                                                        formats := []string{
                                                                time.RFC3339,
                                                                "2006-01-02 
15:04:05",
                                                                "2006/01/02 
15:04:05",
                                                                "01/02/2006 
15:04:05",
                                                                "02/01/2006 
15:04:05",
                                                        }
                                                        for _, format := range 
formats {
                                                                if t, err := 
time.Parse(format, v); err == nil {
                                                                        
int64Val = t.UnixMilli()
                                                                        break
                                                                }
                                                        }
                                                }
                                        }
                                        builder.Append(int64Val)
                                }
                        }
   
                case arrow.FLOAT64:
                        builder := 
p.recordBuilder.Field(i).(*array.Float64Builder)
                        for _, entity := range entities {
                                val, ok := entity.Values[fieldName]
                                if !ok || val == nil {
                                        builder.AppendNull()
                                } else {
                                        var floatVal float64
                                        switch v := val.(type) {
                                        case float64:
                                                floatVal = v
                                        case float32:
                                                floatVal = float64(v)
                                        case int:
                                                floatVal = float64(v)
                                        case int64:
                                                floatVal = float64(v)
                                        case string:
                                                fmt.Sscanf(v, "%f", &floatVal)
                                        }
                                        builder.Append(floatVal)
                                }
                        }
   
                case arrow.BOOL:
                        builder := 
p.recordBuilder.Field(i).(*array.BooleanBuilder)
                        for _, entity := range entities {
                                val, ok := entity.Values[fieldName]
                                if !ok || val == nil {
                                        builder.AppendNull()
                                } else {
                                        var boolVal bool
                                        switch v := val.(type) {
                                        case bool:
                                                boolVal = v
                                        case string:
                                                switch v {
                                                case "true", "True", "TRUE", 
"1":
                                                        boolVal = true
                                                case "false", "False", "FALSE", 
"0":
                                                        boolVal = false
                                                }
                                        case int:
                                                boolVal = v != 0
                                        case float64:
                                                boolVal = v != 0
                                        }
                                        builder.Append(boolVal)
                                }
                        }
   
                default:
                        log.Printf("警告: 字段 %s 的类型 %s 不支持", fieldName, 
field.Type.ID())
                }
        }
   
        // 构建Arrow RecordBatch
        recordBatch := p.recordBuilder.NewRecordBatch()
        defer recordBatch.Release()
   
        // 使用Arrow Go v18最新的方式写入Parquet文件
        if err := p.arrowWriter.Write(recordBatch); err != nil {
                return fmt.Errorf("failed to write record batch to parquet: 
%w", err)
        }
   
        // 调用TotalCounter.CountN方法,记录成功写入的记录数
        (*p.TotalCounter).CountN(int64(len(entities)))
   
        return nil
   }
   
   // getCompressionType 将压缩算法名称转换为对应的Parquet压缩类型
   func getCompressionType(compression string) compress.Compression {
        switch compression {
        case "SNAPPY":
                return compress.Codecs.Snappy
        case "GZIP":
                return compress.Codecs.Gzip
        case "ZSTD":
                return compress.Codecs.Zstd
        default:
                // 默认SNAPPY压缩
                return compress.Codecs.Snappy
        }
   }
   
   // Close 完成处理器的工作,刷新所有挂起的数据
   func (p *ParquetProcessor) Close() error {
        var closeErrors []error
   
        // 释放Arrow Record构建器
        if p.recordBuilder != nil {
                p.recordBuilder.Release()
                p.recordBuilder = nil
        }
   
        // 关闭Arrow Parquet写入器
        if p.arrowWriter != nil {
                log.Printf("ParquetProcessor[%d] 正在关闭Arrow Parquet写入器...当前文件: 
%s", p.instanceID, p.filePath)
                if err := p.arrowWriter.Close(); err != nil {
                        log.Printf("ParquetProcessor[%d] Error closing Arrow 
Parquet writer: %v", p.instanceID, err)
                        closeErrors = append(closeErrors, fmt.Errorf("failed to 
close arrow parquet writer: %w", err))
                }
                p.arrowWriter = nil
        }
   
        // 关闭文件
        if p.currentFile != nil {
                log.Printf("ParquetProcessor[%d] 正在关闭文件: %s", p.instanceID, 
p.filePath)
                if err := p.currentFile.Close(); err != nil {
                        log.Printf("ParquetProcessor[%d] Error closing file: 
%v", p.instanceID, err)
                        closeErrors = append(closeErrors, fmt.Errorf("failed to 
close file: %w", err))
                }
                p.currentFile = nil
        }
   
        // 释放Arrow Schema引用
        if p.arrowSchema != nil {
                p.arrowSchema = nil
        }
   
        // 释放Parquet Schema引用
        if p.parquetSchema != nil {
                p.parquetSchema = nil
        }
   
        // 释放Writer Properties引用
        if p.writerProps != nil {
                p.writerProps = nil
        }
   
        if len(closeErrors) > 0 {
                log.Printf("ParquetProcessor[%d] 关闭时遇到 %d 个错误", p.instanceID, 
len(closeErrors))
                return fmt.Errorf("close errors: %v", closeErrors)
        }
   
        log.Printf("ParquetProcessor[%d] 已完全关闭,文件: %s", p.instanceID, 
p.filePath)
        return nil
   }
   
   // UpdateConfig 更新处理器配置
   func (p *ParquetProcessor) UpdateConfig(configData map[string]interface{}) 
error {
        // 解析配置
        configBytes, err := json.Marshal(configData)
        if err != nil {
                return fmt.Errorf("failed to marshal config: %w", err)
        }
   
        var newConfig ParquetConfig
        if err := json.Unmarshal(configBytes, &newConfig); err != nil {
                return fmt.Errorf("failed to unmarshal config: %w", err)
        }
   
        // 不需要刷新,因为没有批处理缓冲区
   
        // 仅在首次配置时需要初始化
        needReinitialization := p.dictionary == nil
   
        if needReinitialization {
                // 关闭现有的资源
                if p.arrowWriter != nil {
                        if err := p.arrowWriter.Close(); err != nil {
                                log.Printf("ParquetProcessor[%d] Error closing 
Arrow Parquet writer during config update: %v", p.instanceID, err)
                        }
                        p.arrowWriter = nil
                }
   
                if p.currentFile != nil {
                        if err := p.currentFile.Close(); err != nil {
                                log.Printf("ParquetProcessor[%d] Error closing 
file during config update: %v", p.instanceID, err)
                        }
                        p.currentFile = nil
                }
   
                // 更新数据目录
                if newConfig.DataDir != "" {
                        p.dataDir = newConfig.DataDir
                }
   
                // 加载字典
                if p.dictionary == nil {
   
                        dictionary, err := 
common.GetDic(newConfig.DictionaryPath)
   
                        if err != nil {
                                return fmt.Errorf("failed to load dictionary 
from %s: %w", newConfig.DictionaryPath, err)
                        }
                        p.dictionary = dictionary
   
                        // 重新构建 Arrow 和 Parquet schema
                        p.arrowSchema = buildArrowSchema(p.dictionary)
                        p.parquetSchema = buildParquetSchema(p.dictionary)
                }
        }
   
        // 更新批处理大小(虽然不再使用批处理,但保留配置)
        if newConfig.BatchSize > 0 {
                p.batchSize = newConfig.BatchSize
        }
   
        // 更新压缩算法
        if newConfig.Compression != "" {
                p.config.Compression = newConfig.Compression
        }
   
        // 更新配置对象的其他字段
        if newConfig.DataDir != "" {
                dirPath, pathErr := common.GetDataDirPath(p.dataDir)
   
                if pathErr != nil {
                        fmt.Println("data pathErr,use default", pathErr)
                }
                p.config.DataDir = dirPath
        }
        if newConfig.DictionaryPath != "" {
                p.config.DictionaryPath = newConfig.DictionaryPath
        }
   
        // 如果需要重新初始化 Parquet 文件
        if p.arrowWriter == nil || p.currentFile == nil {
                if err := p.initializeParquetFile(); err != nil {
                        return fmt.Errorf("failed to initialize Parquet file: 
%w", err)
                }
        }
   
        log.Printf("ParquetProcessor[%d] configuration updated successfully", 
p.instanceID)
        return nil
   }
   
   // ReadParquetFile 读取Parquet文件并返回事件数据
   // 注意:此功能需要进一步实现,当前版本暂时注释掉以避免编译错误
   /*
   func (p *ParquetProcessor) ReadParquetFile(filePath string) ([]model.Event, 
error) {
        // 实现读取功能需要正确的Arrow API使用
        // 暂时返回空实现
        return nil, fmt.Errorf("read functionality not implemented yet")
   }
   */
   `
        


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to