This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new dafde1097 fix: close pg conn (#4273)
dafde1097 is described below

commit dafde10971cdb8f7fc4a18110fbb338e6468f00e
Author: long2ice <[email protected]>
AuthorDate: Mon Jan 30 17:55:32 2023 +0800

    fix: close pg conn (#4273)
    
    * fix: close pg conn
    
    * fix: close pg conn after rows
    
    * fix: wrap for loop rows in local func
---
 backend/plugins/starrocks/tasks/tasks.go | 87 ++++++++++++++++++--------------
 1 file changed, 50 insertions(+), 37 deletions(-)

diff --git a/backend/plugins/starrocks/tasks/tasks.go 
b/backend/plugins/starrocks/tasks/tasks.go
index 2f9cc3a10..1b504e1c9 100644
--- a/backend/plugins/starrocks/tasks/tasks.go
+++ b/backend/plugins/starrocks/tasks/tasks.go
@@ -186,16 +186,20 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal, 
starrocksTable string, starro
        firstcm := ""
        firstcmName := ""
        var rowsInStarRocks *sql.Rows
+       var rowsInPostgres dal.Rows
        defer func() {
                if rowsInStarRocks != nil {
                        rowsInStarRocks.Close()
                }
+               if rowsInPostgres != nil {
+                       rowsInPostgres.Close()
+               }
        }()
        for _, cm := range columnMetas {
                name := cm.Name()
                if name == updateColumn {
                        // check update column to detect skip or not
-                       rows, err := db.Cursor(
+                       rowsInPostgres, err = db.Cursor(
                                dal.From(table),
                                dal.Select(updateColumn),
                                dal.Limit(1),
@@ -205,8 +209,8 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal, 
starrocksTable string, starro
                                return nil, "", false, err
                        }
                        var updatedFrom time.Time
-                       if rows.Next() {
-                               err = errors.Convert(rows.Scan(&updatedFrom))
+                       if rowsInPostgres.Next() {
+                               err = 
errors.Convert(rowsInPostgres.Scan(&updatedFrom))
                                if err != nil {
                                        return nil, "", false, err
                                }
@@ -277,44 +281,51 @@ func loadData(starrocks *sql.DB, c plugin.SubTaskContext, 
starrocksTable, starro
        offset := 0
        var err error
        for {
-               var rows dal.Rows
                var data []map[string]interface{}
                // select data from db
-               rows, err = db.Cursor(
-                       dal.From(table),
-                       dal.Orderby(orderBy),
-                       dal.Limit(config.BatchSize),
-                       dal.Offset(offset),
-               )
-               if err != nil {
-                       return err
-               }
-               cols, err := rows.Columns()
-               if err != nil {
-                       return err
-               }
-               for rows.Next() {
-                       row := make(map[string]interface{})
-                       columns := make([]interface{}, len(cols))
-                       columnPointers := make([]interface{}, len(cols))
-                       for i := range columns {
-                               dataType := columnMap[cols[i]]
-                               if strings.HasPrefix(dataType, "array") {
-                                       var arr []string
-                                       columns[i] = &arr
-                                       columnPointers[i] = pq.Array(&arr)
-                               } else {
-                                       columnPointers[i] = &columns[i]
-                               }
+               err = func() error {
+                       var rows dal.Rows
+                       rows, err = db.Cursor(
+                               dal.From(table),
+                               dal.Orderby(orderBy),
+                               dal.Limit(config.BatchSize),
+                               dal.Offset(offset),
+                       )
+                       if err != nil {
+                               return err
                        }
-                       err = rows.Scan(columnPointers...)
+                       defer rows.Close()
+                       cols, err := rows.Columns()
                        if err != nil {
                                return err
                        }
-                       for i, colName := range cols {
-                               row[colName] = columns[i]
+                       for rows.Next() {
+                               row := make(map[string]interface{})
+                               columns := make([]interface{}, len(cols))
+                               columnPointers := make([]interface{}, len(cols))
+                               for i := range columns {
+                                       dataType := columnMap[cols[i]]
+                                       if strings.HasPrefix(dataType, "array") 
{
+                                               var arr []string
+                                               columns[i] = &arr
+                                               columnPointers[i] = 
pq.Array(&arr)
+                                       } else {
+                                               columnPointers[i] = &columns[i]
+                                       }
+                               }
+                               err = rows.Scan(columnPointers...)
+                               if err != nil {
+                                       return err
+                               }
+                               for i, colName := range cols {
+                                       row[colName] = columns[i]
+                               }
+                               data = append(data, row)
                        }
-                       data = append(data, row)
+                       return nil
+               }()
+               if err != nil {
+                       return err
                }
                if len(data) == 0 {
                        c.GetLogger().Warn(nil, "no data found in table %s 
already, limit: %d, offset: %d, so break", table, config.BatchSize, offset)
@@ -406,6 +417,7 @@ func loadData(starrocks *sql.DB, c plugin.SubTaskContext, 
starrocksTable, starro
        if err != nil {
                return err
        }
+       defer rows.Close()
        var sourceCount int
        for rows.Next() {
                err = rows.Scan(&sourceCount)
@@ -413,13 +425,14 @@ func loadData(starrocks *sql.DB, c plugin.SubTaskContext, 
starrocksTable, starro
                        return err
                }
        }
-       rows, err = starrocks.Query(fmt.Sprintf("select count(*) from %s", 
starrocksTable))
+       rowsStarRocks, err := starrocks.Query(fmt.Sprintf("select count(*) from 
%s", starrocksTable))
        if err != nil {
                return err
        }
+       defer rowsStarRocks.Close()
        var starrocksCount int
-       for rows.Next() {
-               err = rows.Scan(&starrocksCount)
+       for rowsStarRocks.Next() {
+               err = rowsStarRocks.Scan(&starrocksCount)
                if err != nil {
                        return err
                }

Reply via email to