This is an automated email from the ASF dual-hosted git repository. klesh pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push: new 6ebaf9bed feat: add ExtractQDevS3DataMeta (#8459) 6ebaf9bed is described below commit 6ebaf9bed9740b1d94c7539a829777c7c5c7caf5 Author: Warren Chen <warren.chen...@gmail.com> AuthorDate: Tue Jun 3 20:39:12 2025 +0800 feat: add ExtractQDevS3DataMeta (#8459) --- backend/plugins/q_dev/impl/impl.go | 1 + backend/plugins/q_dev/tasks/s3_data_extractor.go | 241 +++++++++++++++++++++++ 2 files changed, 242 insertions(+) diff --git a/backend/plugins/q_dev/impl/impl.go b/backend/plugins/q_dev/impl/impl.go index c8ddea5c1..c5dc8e91d 100644 --- a/backend/plugins/q_dev/impl/impl.go +++ b/backend/plugins/q_dev/impl/impl.go @@ -80,6 +80,7 @@ func (p QDev) ScopeConfig() dal.Tabler { func (p QDev) SubTaskMetas() []plugin.SubTaskMeta { return []plugin.SubTaskMeta{ tasks.CollectQDevS3FilesMeta, + tasks.ExtractQDevS3DataMeta, } } diff --git a/backend/plugins/q_dev/tasks/s3_data_extractor.go b/backend/plugins/q_dev/tasks/s3_data_extractor.go new file mode 100644 index 000000000..748e9b6c4 --- /dev/null +++ b/backend/plugins/q_dev/tasks/s3_data_extractor.go @@ -0,0 +1,241 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tasks + +import ( + "encoding/csv" + "fmt" + "github.com/apache/incubator-devlake/core/dal" + "github.com/apache/incubator-devlake/core/errors" + "github.com/apache/incubator-devlake/core/plugin" + "github.com/apache/incubator-devlake/plugins/q_dev/models" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "io" + "strconv" + "strings" + "time" +) + +var _ plugin.SubTaskEntryPoint = ExtractQDevS3Data + +// ExtractQDevS3Data 从S3下载CSV数据并解析 +func ExtractQDevS3Data(taskCtx plugin.SubTaskContext) errors.Error { + data := taskCtx.GetData().(*QDevTaskData) + db := taskCtx.GetDal() + + // 查询未处理的文件元数据 + cursor, err := db.Cursor( + dal.From(&models.QDevS3FileMeta{}), + dal.Where("connection_id = ? AND processed = ?", data.Options.ConnectionId, false), + ) + if err != nil { + return errors.Default.Wrap(err, "failed to get file metadata cursor") + } + defer cursor.Close() + + taskCtx.SetProgress(0, -1) + + // 处理每个文件 + for cursor.Next() { + fileMeta := &models.QDevS3FileMeta{} + err = db.Fetch(cursor, fileMeta) + if err != nil { + return errors.Default.Wrap(err, "failed to fetch file metadata") + } + + // 获取文件内容 + getInput := &s3.GetObjectInput{ + Bucket: aws.String(data.S3Client.Bucket), + Key: aws.String(fileMeta.S3Path), + } + + getResult, err := data.S3Client.S3.GetObject(getInput) + if err != nil { + return errors.Convert(err) + } + + // 处理CSV文件 + err = processCSVData(taskCtx, db, getResult.Body, fileMeta) + if err != nil { + return errors.Default.Wrap(err, fmt.Sprintf("failed to process CSV file %s", fileMeta.FileName)) + } + + // 更新文件处理状态 + fileMeta.Processed = true + now := time.Now() + fileMeta.ProcessedTime = &now + err = db.Update(fileMeta) + if err != nil { + return errors.Default.Wrap(err, "failed to update file metadata") + } + + taskCtx.IncProgress(1) + } + + return nil +} + +// 处理CSV文件 +func processCSVData(taskCtx plugin.SubTaskContext, db dal.Dal, reader io.ReadCloser, fileMeta *models.QDevS3FileMeta) errors.Error { + defer reader.Close() + + csvReader := csv.NewReader(reader) + // 使用默认的逗号分隔符,不需要设置 Comma + csvReader.LazyQuotes = true // 允许非标准引号处理 + csvReader.FieldsPerRecord = -1 // 允许每行字段数不同 + + // 读取标头 + headers, err := csvReader.Read() + fmt.Printf("headers: %+v\n", headers) + if err != nil { + return errors.Convert(err) + } + + // 逐行读取数据 + for { + record, err := csvReader.Read() + if err == io.EOF { + break + } + if err != nil { + return errors.Convert(err) + } + + // 创建用户数据对象 + userData, err := createUserData(headers, record, fileMeta) + if err != nil { + return errors.Default.Wrap(err, "failed to create user data") + } + + // 保存到数据库 + err = db.Create(userData) + if err != nil { + return errors.Default.Wrap(err, "failed to save user data") + } + } + + return nil +} + +// 从CSV记录创建用户数据对象 +func createUserData(headers []string, record []string, fileMeta *models.QDevS3FileMeta) (*models.QDevUserData, errors.Error) { + userData := &models.QDevUserData{ + ConnectionId: fileMeta.ConnectionId, + } + + // 创建字段映射 + fieldMap := make(map[string]string) + for i, header := range headers { + if i < len(record) { + // 打印每个header和对应的值,帮助调试 + fmt.Printf("Mapping header[%d]: '%s' -> '%s'\n", i, header, record[i]) + fieldMap[header] = record[i] + // 同时添加去除空格的版本 + trimmedHeader := strings.TrimSpace(header) + if trimmedHeader != header { + fmt.Printf("Also adding trimmed header: '%s'\n", trimmedHeader) + fieldMap[trimmedHeader] = record[i] + } + } + } + + // 设置必要字段 + var err error + var ok bool + + // 设置UserId + userData.UserId, ok = fieldMap["UserId"] + if !ok { + return nil, errors.Default.New("UserId not found in CSV record") + } + + // 设置Date + dateStr, ok := fieldMap["Date"] + if !ok { + return nil, errors.Default.New("Date not found in CSV record") + } + + userData.Date, err = parseDate(dateStr) + if err != nil { + return nil, errors.Default.Wrap(err, "failed to parse date") + } + + // 设置指标字段 + userData.CodeReview_FindingsCount = parseInt(fieldMap, "CodeReview_FindingsCount") + userData.CodeReview_SucceededEventCount = parseInt(fieldMap, "CodeReview_SucceededEventCount") + userData.InlineChat_AcceptanceEventCount = parseInt(fieldMap, "InlineChat_AcceptanceEventCount") + userData.InlineChat_AcceptedLineAdditions = parseInt(fieldMap, "InlineChat_AcceptedLineAdditions") + userData.InlineChat_AcceptedLineDeletions = parseInt(fieldMap, "InlineChat_AcceptedLineDeletions") + userData.InlineChat_DismissalEventCount = parseInt(fieldMap, "InlineChat_DismissalEventCount") + userData.InlineChat_DismissedLineAdditions = parseInt(fieldMap, "InlineChat_DismissedLineAdditions") + userData.InlineChat_DismissedLineDeletions = parseInt(fieldMap, "InlineChat_DismissedLineDeletions") + userData.InlineChat_RejectedLineAdditions = parseInt(fieldMap, "InlineChat_RejectedLineAdditions") + userData.InlineChat_RejectedLineDeletions = parseInt(fieldMap, "InlineChat_RejectedLineDeletions") + userData.InlineChat_RejectionEventCount = parseInt(fieldMap, "InlineChat_RejectionEventCount") + userData.InlineChat_TotalEventCount = parseInt(fieldMap, "InlineChat_TotalEventCount") + userData.Inline_AICodeLines = parseInt(fieldMap, "Inline_AICodeLines") + userData.Inline_AcceptanceCount = parseInt(fieldMap, "Inline_AcceptanceCount") + userData.Inline_SuggestionsCount = parseInt(fieldMap, "Inline_SuggestionsCount") + + return userData, nil +} + +// 解析日期 +func parseDate(dateStr string) (time.Time, errors.Error) { + // 尝试常见的日期格式 + formats := []string{ + "2006-01-02", + "2006/01/02", + "01/02/2006", + "01-02-2006", + time.RFC3339, + } + + for _, format := range formats { + date, err := time.Parse(format, dateStr) + if err == nil { + return date, nil + } + } + + return time.Time{}, errors.Default.New(fmt.Sprintf("failed to parse date: %s", dateStr)) +} + +// 解析整数 +func parseInt(fieldMap map[string]string, field string) int { + value, ok := fieldMap[field] + if !ok { + return 0 + } + + intValue, err := strconv.Atoi(value) + if err != nil { + return 0 + } + + return intValue +} + +var ExtractQDevS3DataMeta = plugin.SubTaskMeta{ + Name: "extractQDevS3Data", + EntryPoint: ExtractQDevS3Data, + EnabledByDefault: true, + Description: "Extract data from S3 CSV files and save to database", + DomainTypes: []string{plugin.DOMAIN_TYPE_CROSS}, +}