This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ebaf9bed feat: add ExtractQDevS3DataMeta (#8459)
6ebaf9bed is described below

commit 6ebaf9bed9740b1d94c7539a829777c7c5c7caf5
Author: Warren Chen <warren.chen...@gmail.com>
AuthorDate: Tue Jun 3 20:39:12 2025 +0800

    feat: add ExtractQDevS3DataMeta (#8459)
---
 backend/plugins/q_dev/impl/impl.go               |   1 +
 backend/plugins/q_dev/tasks/s3_data_extractor.go | 241 +++++++++++++++++++++++
 2 files changed, 242 insertions(+)

diff --git a/backend/plugins/q_dev/impl/impl.go 
b/backend/plugins/q_dev/impl/impl.go
index c8ddea5c1..c5dc8e91d 100644
--- a/backend/plugins/q_dev/impl/impl.go
+++ b/backend/plugins/q_dev/impl/impl.go
@@ -80,6 +80,7 @@ func (p QDev) ScopeConfig() dal.Tabler {
 func (p QDev) SubTaskMetas() []plugin.SubTaskMeta {
        return []plugin.SubTaskMeta{
                tasks.CollectQDevS3FilesMeta,
+               tasks.ExtractQDevS3DataMeta,
        }
 }
 
diff --git a/backend/plugins/q_dev/tasks/s3_data_extractor.go 
b/backend/plugins/q_dev/tasks/s3_data_extractor.go
new file mode 100644
index 000000000..748e9b6c4
--- /dev/null
+++ b/backend/plugins/q_dev/tasks/s3_data_extractor.go
@@ -0,0 +1,241 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+       "encoding/csv"
+       "fmt"
+       "github.com/apache/incubator-devlake/core/dal"
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/plugins/q_dev/models"
+       "github.com/aws/aws-sdk-go/aws"
+       "github.com/aws/aws-sdk-go/service/s3"
+       "io"
+       "strconv"
+       "strings"
+       "time"
+)
+
+var _ plugin.SubTaskEntryPoint = ExtractQDevS3Data
+
+// ExtractQDevS3Data 从S3下载CSV数据并解析
+func ExtractQDevS3Data(taskCtx plugin.SubTaskContext) errors.Error {
+       data := taskCtx.GetData().(*QDevTaskData)
+       db := taskCtx.GetDal()
+
+       // 查询未处理的文件元数据
+       cursor, err := db.Cursor(
+               dal.From(&models.QDevS3FileMeta{}),
+               dal.Where("connection_id = ? AND processed = ?", 
data.Options.ConnectionId, false),
+       )
+       if err != nil {
+               return errors.Default.Wrap(err, "failed to get file metadata 
cursor")
+       }
+       defer cursor.Close()
+
+       taskCtx.SetProgress(0, -1)
+
+       // 处理每个文件
+       for cursor.Next() {
+               fileMeta := &models.QDevS3FileMeta{}
+               err = db.Fetch(cursor, fileMeta)
+               if err != nil {
+                       return errors.Default.Wrap(err, "failed to fetch file 
metadata")
+               }
+
+               // 获取文件内容
+               getInput := &s3.GetObjectInput{
+                       Bucket: aws.String(data.S3Client.Bucket),
+                       Key:    aws.String(fileMeta.S3Path),
+               }
+
+               getResult, err := data.S3Client.S3.GetObject(getInput)
+               if err != nil {
+                       return errors.Convert(err)
+               }
+
+               // 处理CSV文件
+               err = processCSVData(taskCtx, db, getResult.Body, fileMeta)
+               if err != nil {
+                       return errors.Default.Wrap(err, fmt.Sprintf("failed to 
process CSV file %s", fileMeta.FileName))
+               }
+
+               // 更新文件处理状态
+               fileMeta.Processed = true
+               now := time.Now()
+               fileMeta.ProcessedTime = &now
+               err = db.Update(fileMeta)
+               if err != nil {
+                       return errors.Default.Wrap(err, "failed to update file 
metadata")
+               }
+
+               taskCtx.IncProgress(1)
+       }
+
+       return nil
+}
+
+// 处理CSV文件
+func processCSVData(taskCtx plugin.SubTaskContext, db dal.Dal, reader 
io.ReadCloser, fileMeta *models.QDevS3FileMeta) errors.Error {
+       defer reader.Close()
+
+       csvReader := csv.NewReader(reader)
+       // 使用默认的逗号分隔符,不需要设置 Comma
+       csvReader.LazyQuotes = true    // 允许非标准引号处理
+       csvReader.FieldsPerRecord = -1 // 允许每行字段数不同
+
+       // 读取标头
+       headers, err := csvReader.Read()
+       fmt.Printf("headers: %+v\n", headers)
+       if err != nil {
+               return errors.Convert(err)
+       }
+
+       // 逐行读取数据
+       for {
+               record, err := csvReader.Read()
+               if err == io.EOF {
+                       break
+               }
+               if err != nil {
+                       return errors.Convert(err)
+               }
+
+               // 创建用户数据对象
+               userData, err := createUserData(headers, record, fileMeta)
+               if err != nil {
+                       return errors.Default.Wrap(err, "failed to create user 
data")
+               }
+
+               // 保存到数据库
+               err = db.Create(userData)
+               if err != nil {
+                       return errors.Default.Wrap(err, "failed to save user 
data")
+               }
+       }
+
+       return nil
+}
+
+// 从CSV记录创建用户数据对象
+func createUserData(headers []string, record []string, fileMeta 
*models.QDevS3FileMeta) (*models.QDevUserData, errors.Error) {
+       userData := &models.QDevUserData{
+               ConnectionId: fileMeta.ConnectionId,
+       }
+
+       // 创建字段映射
+       fieldMap := make(map[string]string)
+       for i, header := range headers {
+               if i < len(record) {
+                       // 打印每个header和对应的值,帮助调试
+                       fmt.Printf("Mapping header[%d]: '%s' -> '%s'\n", i, 
header, record[i])
+                       fieldMap[header] = record[i]
+                       // 同时添加去除空格的版本
+                       trimmedHeader := strings.TrimSpace(header)
+                       if trimmedHeader != header {
+                               fmt.Printf("Also adding trimmed header: 
'%s'\n", trimmedHeader)
+                               fieldMap[trimmedHeader] = record[i]
+                       }
+               }
+       }
+
+       // 设置必要字段
+       var err error
+       var ok bool
+
+       // 设置UserId
+       userData.UserId, ok = fieldMap["UserId"]
+       if !ok {
+               return nil, errors.Default.New("UserId not found in CSV record")
+       }
+
+       // 设置Date
+       dateStr, ok := fieldMap["Date"]
+       if !ok {
+               return nil, errors.Default.New("Date not found in CSV record")
+       }
+
+       userData.Date, err = parseDate(dateStr)
+       if err != nil {
+               return nil, errors.Default.Wrap(err, "failed to parse date")
+       }
+
+       // 设置指标字段
+       userData.CodeReview_FindingsCount = parseInt(fieldMap, 
"CodeReview_FindingsCount")
+       userData.CodeReview_SucceededEventCount = parseInt(fieldMap, 
"CodeReview_SucceededEventCount")
+       userData.InlineChat_AcceptanceEventCount = parseInt(fieldMap, 
"InlineChat_AcceptanceEventCount")
+       userData.InlineChat_AcceptedLineAdditions = parseInt(fieldMap, 
"InlineChat_AcceptedLineAdditions")
+       userData.InlineChat_AcceptedLineDeletions = parseInt(fieldMap, 
"InlineChat_AcceptedLineDeletions")
+       userData.InlineChat_DismissalEventCount = parseInt(fieldMap, 
"InlineChat_DismissalEventCount")
+       userData.InlineChat_DismissedLineAdditions = parseInt(fieldMap, 
"InlineChat_DismissedLineAdditions")
+       userData.InlineChat_DismissedLineDeletions = parseInt(fieldMap, 
"InlineChat_DismissedLineDeletions")
+       userData.InlineChat_RejectedLineAdditions = parseInt(fieldMap, 
"InlineChat_RejectedLineAdditions")
+       userData.InlineChat_RejectedLineDeletions = parseInt(fieldMap, 
"InlineChat_RejectedLineDeletions")
+       userData.InlineChat_RejectionEventCount = parseInt(fieldMap, 
"InlineChat_RejectionEventCount")
+       userData.InlineChat_TotalEventCount = parseInt(fieldMap, 
"InlineChat_TotalEventCount")
+       userData.Inline_AICodeLines = parseInt(fieldMap, "Inline_AICodeLines")
+       userData.Inline_AcceptanceCount = parseInt(fieldMap, 
"Inline_AcceptanceCount")
+       userData.Inline_SuggestionsCount = parseInt(fieldMap, 
"Inline_SuggestionsCount")
+
+       return userData, nil
+}
+
+// 解析日期
+func parseDate(dateStr string) (time.Time, errors.Error) {
+       // 尝试常见的日期格式
+       formats := []string{
+               "2006-01-02",
+               "2006/01/02",
+               "01/02/2006",
+               "01-02-2006",
+               time.RFC3339,
+       }
+
+       for _, format := range formats {
+               date, err := time.Parse(format, dateStr)
+               if err == nil {
+                       return date, nil
+               }
+       }
+
+       return time.Time{}, errors.Default.New(fmt.Sprintf("failed to parse 
date: %s", dateStr))
+}
+
+// 解析整数
+func parseInt(fieldMap map[string]string, field string) int {
+       value, ok := fieldMap[field]
+       if !ok {
+               return 0
+       }
+
+       intValue, err := strconv.Atoi(value)
+       if err != nil {
+               return 0
+       }
+
+       return intValue
+}
+
+var ExtractQDevS3DataMeta = plugin.SubTaskMeta{
+       Name:             "extractQDevS3Data",
+       EntryPoint:       ExtractQDevS3Data,
+       EnabledByDefault: true,
+       Description:      "Extract data from S3 CSV files and save to database",
+       DomainTypes:      []string{plugin.DOMAIN_TYPE_CROSS},
+}

Reply via email to