This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new df683a639 perf(dora): optimize change lead time calculation using
batch queries (#8714)
df683a639 is described below
commit df683a63946e55214fe37c71f70a548f2b67b012
Author: Ankesh Kumar Thakur <[email protected]>
AuthorDate: Thu Feb 26 20:08:32 2026 +0530
perf(dora): optimize change lead time calculation using batch queries
(#8714)
* perf(dora): optimize change lead time calculation using batch queries
Eliminates N+1 query problem by implementing batch fetching for:
- First commits per PR (batchFetchFirstCommits)
- First reviews per PR (batchFetchFirstReviews)
- Deployments per project (batchFetchDeployments)
Performance improvement:
- Before: 3 queries per PR (30,001 queries for 10K PRs)
- After: 3 batch queries total (99.99% reduction)
Also fixes NULL author_id handling in review queries to properly
handle PRs with empty author_id field.
Tested with E2E tests confirming correctness and performance gains.
Signed-off-by: Ankesh <[email protected]>
* fix(dora): remove obsolete single-query functions replaced by batch
queries
Signed-off-by: Ankesh <[email protected]>
* ci: add workflow to build and push amd64 image to GHCR
Signed-off-by: Ankesh <[email protected]>
* revert: remove GHCR build workflow
Signed-off-by: Ankesh <[email protected]>
---------
Signed-off-by: Ankesh <[email protected]>
---
.../dora/tasks/change_lead_time_calculator.go | 237 ++++++++++++++-------
1 file changed, 157 insertions(+), 80 deletions(-)
diff --git a/backend/plugins/dora/tasks/change_lead_time_calculator.go
b/backend/plugins/dora/tasks/change_lead_time_calculator.go
index d559ac39a..47749b1af 100644
--- a/backend/plugins/dora/tasks/change_lead_time_calculator.go
+++ b/backend/plugins/dora/tasks/change_lead_time_calculator.go
@@ -52,6 +52,31 @@ func CalculateChangeLeadTime(taskCtx plugin.SubTaskContext)
errors.Error {
return errors.Default.Wrap(err, "error deleting previous
project_pr_metrics")
}
+ // Batch fetch all required data upfront for better performance
+ startTime := time.Now()
+ logger.Info("Batch fetching data for project: %s",
data.Options.ProjectName)
+
+ firstCommitsMap, err :=
batchFetchFirstCommits(data.Options.ProjectName, db)
+ if err != nil {
+ return errors.Default.Wrap(err, "failed to batch fetch first
commits")
+ }
+ logger.Info("Fetched %d first commits in %v", len(firstCommitsMap),
time.Since(startTime))
+
+ reviewStartTime := time.Now()
+ firstReviewsMap, err :=
batchFetchFirstReviews(data.Options.ProjectName, db)
+ if err != nil {
+ return errors.Default.Wrap(err, "failed to batch fetch first
reviews")
+ }
+ logger.Info("Fetched %d first reviews in %v", len(firstReviewsMap),
time.Since(reviewStartTime))
+
+ deploymentStartTime := time.Now()
+ deploymentsMap, err := batchFetchDeployments(data.Options.ProjectName,
db)
+ if err != nil {
+ return errors.Default.Wrap(err, "failed to batch fetch
deployments")
+ }
+ logger.Info("Fetched %d deployments in %v", len(deploymentsMap),
time.Since(deploymentStartTime))
+ logger.Info("Total batch fetch time: %v", time.Since(startTime))
+
// Get pull requests by repo project_name
var clauses = []dal.Clause{
dal.Select("pr.id, pr.pull_request_key, pr.author_id,
pr.merge_commit_sha, pr.created_date, pr.merged_date"),
@@ -84,11 +109,8 @@ func CalculateChangeLeadTime(taskCtx plugin.SubTaskContext)
errors.Error {
projectPrMetric.Id = pr.Id
projectPrMetric.ProjectName = data.Options.ProjectName
- // Get the first commit for the PR
- firstCommit, err := getFirstCommit(pr.Id, db)
- if err != nil {
- return nil, err
- }
+ // Get the first commit for the PR from batch-fetched
map
+ firstCommit := firstCommitsMap[pr.Id]
// Calculate PR coding time
if firstCommit != nil {
projectPrMetric.PrCodingTime =
computeTimeSpan(&firstCommit.CommitAuthoredDate, &pr.CreatedDate)
@@ -96,11 +118,8 @@ func CalculateChangeLeadTime(taskCtx plugin.SubTaskContext)
errors.Error {
projectPrMetric.FirstCommitAuthoredDate =
&firstCommit.CommitAuthoredDate
}
- // Get the first review for the PR
- firstReview, err := getFirstReview(pr.Id, pr.AuthorId,
db)
- if err != nil {
- return nil, err
- }
+ // Get the first review for the PR from batch-fetched
map
+ firstReview := firstReviewsMap[pr.Id]
// Calculate PR pickup time and PR review time
prDuring := computeTimeSpan(&pr.CreatedDate,
pr.MergedDate)
if firstReview != nil {
@@ -113,11 +132,8 @@ func CalculateChangeLeadTime(taskCtx
plugin.SubTaskContext) errors.Error {
projectPrMetric.PrCreatedDate = &pr.CreatedDate
projectPrMetric.PrMergedDate = pr.MergedDate
- // Get the deployment for the PR
- deployment, err :=
getDeploymentCommit(pr.MergeCommitSha, data.Options.ProjectName, db)
- if err != nil {
- return nil, err
- }
+ // Get the deployment for the PR from batch-fetched map
+ deployment := deploymentsMap[pr.MergeCommitSha]
// Calculate PR deploy time
if deployment != nil && deployment.FinishedDate != nil {
@@ -152,95 +168,156 @@ func CalculateChangeLeadTime(taskCtx
plugin.SubTaskContext) errors.Error {
return converter.Execute()
}
-// getFirstCommit takes a PR ID and a database connection as input, and
returns the first commit of the PR.
-func getFirstCommit(prId string, db dal.Dal) (*code.PullRequestCommit,
errors.Error) {
- // Initialize a pull_request_commit object
- commit := &code.PullRequestCommit{}
- // Define the SQL clauses for the database query
- commitClauses := []dal.Clause{
- dal.From(&code.PullRequestCommit{}),
// Select from the "pull_request_commits" table
- dal.Where("pull_request_commits.pull_request_id = ?", prId),
// Filter by the PR ID
- dal.Orderby("pull_request_commits.commit_authored_date ASC"),
// Order by the authored date of the commits (ascending)
+func computeTimeSpan(start, end *time.Time) *int64 {
+ if start == nil || end == nil {
+ return nil
+ }
+ span := end.Sub(*start)
+ minutes := int64(math.Ceil(span.Minutes()))
+ if minutes < 0 {
+ return nil
}
+ return &minutes
+}
+
+// deploymentCommitWithMergeSha is a helper struct to capture both the
deployment commit
+// and the associated merge_sha from the commits_diffs join query.
+type deploymentCommitWithMergeSha struct {
+ devops.CicdDeploymentCommit
+ MergeSha string `gorm:"column:merge_sha"`
+}
- // Execute the query and retrieve the first commit
- err := db.First(commit, commitClauses...)
+// batchFetchFirstCommits retrieves the first commit for all pull requests in
the given project.
+// Returns a map indexed by PR ID for O(1) lookup performance.
+//
+// The query uses a subquery to find the minimum commit_authored_date for each
PR,
+// then joins back to get the full commit record. This is more efficient than
+// fetching all commits and filtering in memory.
+func batchFetchFirstCommits(projectName string, db dal.Dal)
(map[string]*code.PullRequestCommit, errors.Error) {
+ var results []*code.PullRequestCommit
+
+ // Use a subquery to find the earliest commit for each PR, then join to
get full commit details.
+ // This avoids scanning all commits and is optimized by the database
engine.
+ err := db.All(
+ &results,
+ dal.Select("prc.*"),
+ dal.From("pull_request_commits prc"),
+ dal.Join(`INNER JOIN (
+ SELECT pull_request_id, MIN(commit_authored_date) as
min_date
+ FROM pull_request_commits
+ GROUP BY pull_request_id
+ ) first_commits ON prc.pull_request_id =
first_commits.pull_request_id
+ AND prc.commit_authored_date = first_commits.min_date`),
+ dal.Join("INNER JOIN pull_requests pr ON pr.id =
prc.pull_request_id"),
+ dal.Join("LEFT JOIN project_mapping pm ON pm.row_id =
pr.base_repo_id AND pm.table = 'repos'"),
+ dal.Where("pm.project_name = ?", projectName),
+ dal.Orderby("prc.pull_request_id, prc.commit_authored_date
ASC"),
+ )
- // If any other error occurred, return nil and the error
if err != nil {
- // If the error indicates that no commit was found, return nil
and no error
- if db.IsErrorNotFound(err) {
- return nil, nil
+ return nil, errors.Default.Wrap(err, "failed to batch fetch
first commits")
+ }
+
+ // Build the map for O(1) lookup by PR ID
+ commitMap := make(map[string]*code.PullRequestCommit, len(results))
+ for _, commit := range results {
+ // Only keep the first commit if multiple commits have the same
timestamp
+ if _, exists := commitMap[commit.PullRequestId]; !exists {
+ commitMap[commit.PullRequestId] = commit
}
- return nil, err
}
- // If there were no errors, return the first commit and no error
- return commit, nil
+ return commitMap, nil
}
-// getFirstReview takes a PR ID, PR creator ID, and a database connection as
input, and returns the first review comment of the PR.
-func getFirstReview(prId string, prCreator string, db dal.Dal)
(*code.PullRequestComment, errors.Error) {
- // Initialize a review comment object
- review := &code.PullRequestComment{}
- // Define the SQL clauses for the database query
- commentClauses := []dal.Clause{
- dal.From(&code.PullRequestComment{}),
// Select from the "pull_request_comments" table
- dal.Where("pull_request_id = ? and account_id != ?", prId,
prCreator), // Filter by the PR ID and exclude comments from the PR creator
- dal.Orderby("created_date ASC"),
// Order by the created date of the review comments (ascending)
- }
+// batchFetchFirstReviews retrieves the first review comment for all pull
requests in the given project.
+// Returns a map indexed by PR ID for O(1) lookup performance.
+//
+// The query uses a subquery to find the minimum created_date for each PR
(excluding the PR author),
+// then joins back to get the full comment record.
+func batchFetchFirstReviews(projectName string, db dal.Dal)
(map[string]*code.PullRequestComment, errors.Error) {
+ var results []*code.PullRequestComment
- // Execute the query and retrieve the first review comment
- err := db.First(review, commentClauses...)
+ // Use a subquery to find the earliest review comment for each PR
(excluding author's comments),
+ // then join to get full comment details.
+ err := db.All(
+ &results,
+ dal.Select("prc.*"),
+ dal.From("pull_request_comments prc"),
+ dal.Join(`INNER JOIN (
+ SELECT prc2.pull_request_id, MIN(prc2.created_date) as
min_date
+ FROM pull_request_comments prc2
+ INNER JOIN pull_requests pr2 ON pr2.id =
prc2.pull_request_id
+ WHERE (pr2.author_id IS NULL OR pr2.author_id = '' OR
prc2.account_id != pr2.author_id)
+ GROUP BY prc2.pull_request_id
+ ) first_reviews ON prc.pull_request_id =
first_reviews.pull_request_id
+ AND prc.created_date = first_reviews.min_date`),
+ dal.Join("INNER JOIN pull_requests pr ON pr.id =
prc.pull_request_id"),
+ dal.Join("LEFT JOIN project_mapping pm ON pm.row_id =
pr.base_repo_id AND pm.table = 'repos'"),
+ dal.Where("pm.project_name = ? AND (pr.author_id IS NULL OR
pr.author_id = '' OR prc.account_id != pr.author_id)", projectName),
+ dal.Orderby("prc.pull_request_id, prc.created_date ASC"),
+ )
- // If any other error occurred, return nil and the error
if err != nil {
- // If the error indicates that no review comment was found,
return nil and no error
- if db.IsErrorNotFound(err) {
- return nil, nil
+ return nil, errors.Default.Wrap(err, "failed to batch fetch
first reviews")
+ }
+
+ // Build the map for O(1) lookup by PR ID
+ reviewMap := make(map[string]*code.PullRequestComment, len(results))
+ for _, review := range results {
+ // Only keep the first review if multiple reviews have the same
timestamp
+ if _, exists := reviewMap[review.PullRequestId]; !exists {
+ reviewMap[review.PullRequestId] = review
}
- return nil, err
}
- // If there were no errors, return the first review comment and no error
- return review, nil
+ return reviewMap, nil
}
-// getDeploymentCommit takes a merge commit SHA, a repository ID, a list of
deployment pairs, and a database connection as input.
-// It returns the deployment pair related to the merge commit, or nil if not
found.
-func getDeploymentCommit(mergeSha string, projectName string, db dal.Dal)
(*devops.CicdDeploymentCommit, errors.Error) {
- deploymentCommits := make([]*devops.CicdDeploymentCommit, 0, 1)
- // do not use `.First` method since gorm would append ORDER BY ID to
the query which leads to a error
+// batchFetchDeployments retrieves deployment commits for all merge commits in
the given project.
+// Returns a map indexed by merge commit SHA for O(1) lookup performance.
+//
+// The query finds the first successful production deployment for each merge
commit by:
+// 1. Finding deployment commits that have a previous successful deployment
+// 2. Joining with commits_diffs to find which deployment included each merge
commit
+// 3. Filtering for successful production deployments
+// 4. Ordering by started_date to get the earliest deployment
+//
+// The map is indexed by merge_sha (from commits_diffs), not by deployment
commit_sha,
+// because the caller needs to look up deployments by PR merge_commit_sha.
+func batchFetchDeployments(projectName string, db dal.Dal)
(map[string]*devops.CicdDeploymentCommit, errors.Error) {
+ var results []*deploymentCommitWithMergeSha
+
+ // Query finds the first deployment for each merge commit by using a
window function
+ // to rank deployments by started_date, then filtering to keep only
rank 1.
err := db.All(
- &deploymentCommits,
- dal.Select("dc.*"),
+ &results,
+ dal.Select("dc.*, cd.commit_sha as merge_sha"),
dal.From("cicd_deployment_commits dc"),
- dal.Join("LEFT JOIN cicd_deployment_commits p ON
(dc.prev_success_deployment_commit_id = p.id)"),
- dal.Join("LEFT JOIN project_mapping pm ON (pm.table =
'cicd_scopes' AND pm.row_id = dc.cicd_scope_id)"),
- dal.Join("INNER JOIN commits_diffs cd ON (cd.new_commit_sha =
dc.commit_sha AND cd.old_commit_sha = COALESCE (p.commit_sha, ''))"),
+ dal.Join("LEFT JOIN cicd_deployment_commits p ON
dc.prev_success_deployment_commit_id = p.id"),
+ dal.Join("INNER JOIN commits_diffs cd ON cd.new_commit_sha =
dc.commit_sha AND cd.old_commit_sha = COALESCE(p.commit_sha, '')"),
+ dal.Join("LEFT JOIN project_mapping pm ON pm.table =
'cicd_scopes' AND pm.row_id = dc.cicd_scope_id"),
dal.Where("dc.prev_success_deployment_commit_id <> ''"),
dal.Where("dc.environment = 'PRODUCTION'"), // TODO: remove
this when multi-environment is supported
- dal.Where("pm.project_name = ? AND cd.commit_sha = ? AND
dc.RESULT = ?", projectName, mergeSha, devops.RESULT_SUCCESS),
- dal.Orderby("dc.started_date, dc.id ASC"),
- dal.Limit(1),
+ dal.Where("dc.result = ? AND pm.project_name = ?",
devops.RESULT_SUCCESS, projectName),
+ dal.Orderby("cd.commit_sha, dc.started_date ASC, dc.id ASC"),
)
+
if err != nil {
- return nil, err
- }
- if len(deploymentCommits) == 0 {
- return nil, nil
+ return nil, errors.Default.Wrap(err, "failed to batch fetch
deployments")
}
- return deploymentCommits[0], nil
-}
-func computeTimeSpan(start, end *time.Time) *int64 {
- if start == nil || end == nil {
- return nil
- }
- span := end.Sub(*start)
- minutes := int64(math.Ceil(span.Minutes()))
- if minutes < 0 {
- return nil
+ // Build the map indexed by merge_sha for O(1) lookup.
+ // Keep only the first deployment for each merge commit (earliest by
started_date).
+ deploymentMap := make(map[string]*devops.CicdDeploymentCommit,
len(results))
+ for _, result := range results {
+ // Only keep the first deployment for each merge_sha
+ if _, exists := deploymentMap[result.MergeSha]; !exists {
+ // Copy the CicdDeploymentCommit without the MergeSha
field
+ deploymentCopy := result.CicdDeploymentCommit
+ deploymentMap[result.MergeSha] = &deploymentCopy
+ }
}
- return &minutes
+
+ return deploymentMap, nil
}