This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.14 by this push:
     new a444a83d5 fix: close pg conn after rows (#4274)
a444a83d5 is described below

commit a444a83d5416a6835152fa4f67be88845711a74e
Author: long2ice <[email protected]>
AuthorDate: Mon Jan 30 17:40:59 2023 +0800

    fix: close pg conn after rows (#4274)
    
    * fix: close pg conn after rows
    
    * fix: wrap for loop rows in local func
---
 plugins/starrocks/tasks.go | 67 ++++++++++++++++++++++++++--------------------
 1 file changed, 38 insertions(+), 29 deletions(-)

diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 38c59b524..afdffb98c 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -279,39 +279,46 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable, starrock
        offset := 0
        var err error
        for {
-               var rows *sql.Rows
                var data []map[string]interface{}
                // select data from db
-               rows, err = db.RawCursor(fmt.Sprintf("select * from %s order by 
%s limit %d offset %d", table, orderBy, config.BatchSize, offset))
-               if err != nil {
-                       return err
-               }
-               cols, err := rows.Columns()
-               if err != nil {
-                       return err
-               }
-               for rows.Next() {
-                       row := make(map[string]interface{})
-                       columns := make([]interface{}, len(cols))
-                       columnPointers := make([]interface{}, len(cols))
-                       for i := range columns {
-                               dataType := columnMap[cols[i]]
-                               if strings.HasPrefix(dataType, "array") {
-                                       var arr []string
-                                       columns[i] = &arr
-                                       columnPointers[i] = pq.Array(&arr)
-                               } else {
-                                       columnPointers[i] = &columns[i]
-                               }
+               err = func() error {
+                       var rows *sql.Rows
+                       rows, err = db.RawCursor(fmt.Sprintf("select * from %s 
order by %s limit %d offset %d", table, orderBy, config.BatchSize, offset))
+                       if err != nil {
+                               return err
                        }
-                       err = rows.Scan(columnPointers...)
+                       defer rows.Close()
+                       cols, err := rows.Columns()
                        if err != nil {
                                return err
                        }
-                       for i, colName := range cols {
-                               row[colName] = columns[i]
+                       for rows.Next() {
+                               row := make(map[string]interface{})
+                               columns := make([]interface{}, len(cols))
+                               columnPointers := make([]interface{}, len(cols))
+                               for i := range columns {
+                                       dataType := columnMap[cols[i]]
+                                       if strings.HasPrefix(dataType, "array") 
{
+                                               var arr []string
+                                               columns[i] = &arr
+                                               columnPointers[i] = 
pq.Array(&arr)
+                                       } else {
+                                               columnPointers[i] = &columns[i]
+                                       }
+                               }
+                               err = rows.Scan(columnPointers...)
+                               if err != nil {
+                                       return err
+                               }
+                               for i, colName := range cols {
+                                       row[colName] = columns[i]
+                               }
+                               data = append(data, row)
                        }
-                       data = append(data, row)
+                       return nil
+               }()
+               if err != nil {
+                       return err
                }
                if len(data) == 0 {
                        c.GetLogger().Warn(nil, "no data found in table %s 
already, limit: %d, offset: %d, so break", table, config.BatchSize, offset)
@@ -400,6 +407,7 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable, starrock
        if err != nil {
                return err
        }
+       defer rows.Close()
        var sourceCount int
        for rows.Next() {
                err = rows.Scan(&sourceCount)
@@ -407,13 +415,14 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable, starrock
                        return err
                }
        }
-       rows, err = starrocks.Query(fmt.Sprintf("select count(*) from %s", 
starrocksTable))
+       rowsStarRocks, err := starrocks.Query(fmt.Sprintf("select count(*) from 
%s", starrocksTable))
        if err != nil {
                return err
        }
+       defer rowsStarRocks.Close()
        var starrocksCount int
-       for rows.Next() {
-               err = rows.Scan(&starrocksCount)
+       for rowsStarRocks.Next() {
+               err = rowsStarRocks.Scan(&starrocksCount)
                if err != nil {
                        return err
                }

Reply via email to