This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new dafde1097 fix: close pg conn (#4273)
dafde1097 is described below
commit dafde10971cdb8f7fc4a18110fbb338e6468f00e
Author: long2ice <[email protected]>
AuthorDate: Mon Jan 30 17:55:32 2023 +0800
fix: close pg conn (#4273)
* fix: close pg conn
* fix: close pg conn after rows
* fix: wrap for loop rows in local func
---
backend/plugins/starrocks/tasks/tasks.go | 87 ++++++++++++++++++--------------
1 file changed, 50 insertions(+), 37 deletions(-)
diff --git a/backend/plugins/starrocks/tasks/tasks.go
b/backend/plugins/starrocks/tasks/tasks.go
index 2f9cc3a10..1b504e1c9 100644
--- a/backend/plugins/starrocks/tasks/tasks.go
+++ b/backend/plugins/starrocks/tasks/tasks.go
@@ -186,16 +186,20 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal,
starrocksTable string, starro
firstcm := ""
firstcmName := ""
var rowsInStarRocks *sql.Rows
+ var rowsInPostgres dal.Rows
defer func() {
if rowsInStarRocks != nil {
rowsInStarRocks.Close()
}
+ if rowsInPostgres != nil {
+ rowsInPostgres.Close()
+ }
}()
for _, cm := range columnMetas {
name := cm.Name()
if name == updateColumn {
// check update column to detect skip or not
- rows, err := db.Cursor(
+ rowsInPostgres, err = db.Cursor(
dal.From(table),
dal.Select(updateColumn),
dal.Limit(1),
@@ -205,8 +209,8 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal,
starrocksTable string, starro
return nil, "", false, err
}
var updatedFrom time.Time
- if rows.Next() {
- err = errors.Convert(rows.Scan(&updatedFrom))
+ if rowsInPostgres.Next() {
+ err =
errors.Convert(rowsInPostgres.Scan(&updatedFrom))
if err != nil {
return nil, "", false, err
}
@@ -277,44 +281,51 @@ func loadData(starrocks *sql.DB, c plugin.SubTaskContext,
starrocksTable, starro
offset := 0
var err error
for {
- var rows dal.Rows
var data []map[string]interface{}
// select data from db
- rows, err = db.Cursor(
- dal.From(table),
- dal.Orderby(orderBy),
- dal.Limit(config.BatchSize),
- dal.Offset(offset),
- )
- if err != nil {
- return err
- }
- cols, err := rows.Columns()
- if err != nil {
- return err
- }
- for rows.Next() {
- row := make(map[string]interface{})
- columns := make([]interface{}, len(cols))
- columnPointers := make([]interface{}, len(cols))
- for i := range columns {
- dataType := columnMap[cols[i]]
- if strings.HasPrefix(dataType, "array") {
- var arr []string
- columns[i] = &arr
- columnPointers[i] = pq.Array(&arr)
- } else {
- columnPointers[i] = &columns[i]
- }
+ err = func() error {
+ var rows dal.Rows
+ rows, err = db.Cursor(
+ dal.From(table),
+ dal.Orderby(orderBy),
+ dal.Limit(config.BatchSize),
+ dal.Offset(offset),
+ )
+ if err != nil {
+ return err
}
- err = rows.Scan(columnPointers...)
+ defer rows.Close()
+ cols, err := rows.Columns()
if err != nil {
return err
}
- for i, colName := range cols {
- row[colName] = columns[i]
+ for rows.Next() {
+ row := make(map[string]interface{})
+ columns := make([]interface{}, len(cols))
+ columnPointers := make([]interface{}, len(cols))
+ for i := range columns {
+ dataType := columnMap[cols[i]]
+ if strings.HasPrefix(dataType, "array")
{
+ var arr []string
+ columns[i] = &arr
+ columnPointers[i] =
pq.Array(&arr)
+ } else {
+ columnPointers[i] = &columns[i]
+ }
+ }
+ err = rows.Scan(columnPointers...)
+ if err != nil {
+ return err
+ }
+ for i, colName := range cols {
+ row[colName] = columns[i]
+ }
+ data = append(data, row)
}
- data = append(data, row)
+ return nil
+ }()
+ if err != nil {
+ return err
}
if len(data) == 0 {
c.GetLogger().Warn(nil, "no data found in table %s
already, limit: %d, offset: %d, so break", table, config.BatchSize, offset)
@@ -406,6 +417,7 @@ func loadData(starrocks *sql.DB, c plugin.SubTaskContext,
starrocksTable, starro
if err != nil {
return err
}
+ defer rows.Close()
var sourceCount int
for rows.Next() {
err = rows.Scan(&sourceCount)
@@ -413,13 +425,14 @@ func loadData(starrocks *sql.DB, c plugin.SubTaskContext,
starrocksTable, starro
return err
}
}
- rows, err = starrocks.Query(fmt.Sprintf("select count(*) from %s",
starrocksTable))
+ rowsStarRocks, err := starrocks.Query(fmt.Sprintf("select count(*) from
%s", starrocksTable))
if err != nil {
return err
}
+ defer rowsStarRocks.Close()
var starrocksCount int
- for rows.Next() {
- err = rows.Scan(&starrocksCount)
+ for rowsStarRocks.Next() {
+ err = rowsStarRocks.Scan(&starrocksCount)
if err != nil {
return err
}