This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch release-v0.15
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.15 by this push:
     new 5aab77e3d fix: close pg conn after rows (#4275)
5aab77e3d is described below

commit 5aab77e3d615b5c46b739ef3ad425e4748268b5d
Author: long2ice <[email protected]>
AuthorDate: Mon Jan 30 17:40:30 2023 +0800

    fix: close pg conn after rows (#4275)
    
    * fix: close pg conn after rows
    
    * fix: wrap for loop rows in local func
---
 plugins/starrocks/tasks/tasks.go | 77 ++++++++++++++++++++++------------------
 1 file changed, 43 insertions(+), 34 deletions(-)

diff --git a/plugins/starrocks/tasks/tasks.go b/plugins/starrocks/tasks/tasks.go
index 4995e7ddf..dab2ed317 100644
--- a/plugins/starrocks/tasks/tasks.go
+++ b/plugins/starrocks/tasks/tasks.go
@@ -281,44 +281,51 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable, starrock
        offset := 0
        var err error
        for {
-               var rows dal.Rows
                var data []map[string]interface{}
                // select data from db
-               rows, err = db.Cursor(
-                       dal.From(table),
-                       dal.Orderby(orderBy),
-                       dal.Limit(config.BatchSize),
-                       dal.Offset(offset),
-               )
-               if err != nil {
-                       return err
-               }
-               cols, err := rows.Columns()
-               if err != nil {
-                       return err
-               }
-               for rows.Next() {
-                       row := make(map[string]interface{})
-                       columns := make([]interface{}, len(cols))
-                       columnPointers := make([]interface{}, len(cols))
-                       for i := range columns {
-                               dataType := columnMap[cols[i]]
-                               if strings.HasPrefix(dataType, "array") {
-                                       var arr []string
-                                       columns[i] = &arr
-                                       columnPointers[i] = pq.Array(&arr)
-                               } else {
-                                       columnPointers[i] = &columns[i]
-                               }
+               err = func() error {
+                       var rows dal.Rows
+                       rows, err = db.Cursor(
+                               dal.From(table),
+                               dal.Orderby(orderBy),
+                               dal.Limit(config.BatchSize),
+                               dal.Offset(offset),
+                       )
+                       if err != nil {
+                               return err
                        }
-                       err = rows.Scan(columnPointers...)
+                       defer rows.Close()
+                       cols, err := rows.Columns()
                        if err != nil {
                                return err
                        }
-                       for i, colName := range cols {
-                               row[colName] = columns[i]
+                       for rows.Next() {
+                               row := make(map[string]interface{})
+                               columns := make([]interface{}, len(cols))
+                               columnPointers := make([]interface{}, len(cols))
+                               for i := range columns {
+                                       dataType := columnMap[cols[i]]
+                                       if strings.HasPrefix(dataType, "array") 
{
+                                               var arr []string
+                                               columns[i] = &arr
+                                               columnPointers[i] = 
pq.Array(&arr)
+                                       } else {
+                                               columnPointers[i] = &columns[i]
+                                       }
+                               }
+                               err = rows.Scan(columnPointers...)
+                               if err != nil {
+                                       return err
+                               }
+                               for i, colName := range cols {
+                                       row[colName] = columns[i]
+                               }
+                               data = append(data, row)
                        }
-                       data = append(data, row)
+                       return nil
+               }()
+               if err != nil {
+                       return err
                }
                if len(data) == 0 {
                        c.GetLogger().Warn(nil, "no data found in table %s 
already, limit: %d, offset: %d, so break", table, config.BatchSize, offset)
@@ -410,6 +417,7 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable, starrock
        if err != nil {
                return err
        }
+       defer rows.Close()
        var sourceCount int
        for rows.Next() {
                err = rows.Scan(&sourceCount)
@@ -417,13 +425,14 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable, starrock
                        return err
                }
        }
-       rows, err = starrocks.Query(fmt.Sprintf("select count(*) from %s", 
starrocksTable))
+       rowsStarRocks, err := starrocks.Query(fmt.Sprintf("select count(*) from 
%s", starrocksTable))
        if err != nil {
                return err
        }
+       defer rowsStarRocks.Close()
        var starrocksCount int
-       for rows.Next() {
-               err = rows.Scan(&starrocksCount)
+       for rowsStarRocks.Next() {
+               err = rowsStarRocks.Scan(&starrocksCount)
                if err != nil {
                        return err
                }

Reply via email to