This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch release-v0.15
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/release-v0.15 by this push:
new 5aab77e3d fix: close pg conn after rows (#4275)
5aab77e3d is described below
commit 5aab77e3d615b5c46b739ef3ad425e4748268b5d
Author: long2ice <[email protected]>
AuthorDate: Mon Jan 30 17:40:30 2023 +0800
fix: close pg conn after rows (#4275)
* fix: close pg conn after rows
* fix: wrap for loop rows in local func
---
plugins/starrocks/tasks/tasks.go | 77 ++++++++++++++++++++++------------------
1 file changed, 43 insertions(+), 34 deletions(-)
diff --git a/plugins/starrocks/tasks/tasks.go b/plugins/starrocks/tasks/tasks.go
index 4995e7ddf..dab2ed317 100644
--- a/plugins/starrocks/tasks/tasks.go
+++ b/plugins/starrocks/tasks/tasks.go
@@ -281,44 +281,51 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext,
starrocksTable, starrock
offset := 0
var err error
for {
- var rows dal.Rows
var data []map[string]interface{}
// select data from db
- rows, err = db.Cursor(
- dal.From(table),
- dal.Orderby(orderBy),
- dal.Limit(config.BatchSize),
- dal.Offset(offset),
- )
- if err != nil {
- return err
- }
- cols, err := rows.Columns()
- if err != nil {
- return err
- }
- for rows.Next() {
- row := make(map[string]interface{})
- columns := make([]interface{}, len(cols))
- columnPointers := make([]interface{}, len(cols))
- for i := range columns {
- dataType := columnMap[cols[i]]
- if strings.HasPrefix(dataType, "array") {
- var arr []string
- columns[i] = &arr
- columnPointers[i] = pq.Array(&arr)
- } else {
- columnPointers[i] = &columns[i]
- }
+ err = func() error {
+ var rows dal.Rows
+ rows, err = db.Cursor(
+ dal.From(table),
+ dal.Orderby(orderBy),
+ dal.Limit(config.BatchSize),
+ dal.Offset(offset),
+ )
+ if err != nil {
+ return err
}
- err = rows.Scan(columnPointers...)
+ defer rows.Close()
+ cols, err := rows.Columns()
if err != nil {
return err
}
- for i, colName := range cols {
- row[colName] = columns[i]
+ for rows.Next() {
+ row := make(map[string]interface{})
+ columns := make([]interface{}, len(cols))
+ columnPointers := make([]interface{}, len(cols))
+ for i := range columns {
+ dataType := columnMap[cols[i]]
+ if strings.HasPrefix(dataType, "array")
{
+ var arr []string
+ columns[i] = &arr
+ columnPointers[i] =
pq.Array(&arr)
+ } else {
+ columnPointers[i] = &columns[i]
+ }
+ }
+ err = rows.Scan(columnPointers...)
+ if err != nil {
+ return err
+ }
+ for i, colName := range cols {
+ row[colName] = columns[i]
+ }
+ data = append(data, row)
}
- data = append(data, row)
+ return nil
+ }()
+ if err != nil {
+ return err
}
if len(data) == 0 {
c.GetLogger().Warn(nil, "no data found in table %s
already, limit: %d, offset: %d, so break", table, config.BatchSize, offset)
@@ -410,6 +417,7 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext,
starrocksTable, starrock
if err != nil {
return err
}
+ defer rows.Close()
var sourceCount int
for rows.Next() {
err = rows.Scan(&sourceCount)
@@ -417,13 +425,14 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext,
starrocksTable, starrock
return err
}
}
- rows, err = starrocks.Query(fmt.Sprintf("select count(*) from %s",
starrocksTable))
+ rowsStarRocks, err := starrocks.Query(fmt.Sprintf("select count(*) from
%s", starrocksTable))
if err != nil {
return err
}
+ defer rowsStarRocks.Close()
var starrocksCount int
- for rows.Next() {
- err = rows.Scan(&starrocksCount)
+ for rowsStarRocks.Next() {
+ err = rowsStarRocks.Scan(&starrocksCount)
if err != nil {
return err
}