This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new df683a639 perf(dora): optimize change lead time calculation using 
batch queries (#8714)
df683a639 is described below

commit df683a63946e55214fe37c71f70a548f2b67b012
Author: Ankesh Kumar Thakur <[email protected]>
AuthorDate: Thu Feb 26 20:08:32 2026 +0530

    perf(dora): optimize change lead time calculation using batch queries 
(#8714)
    
    * perf(dora): optimize change lead time calculation using batch queries
    
    Eliminates N+1 query problem by implementing batch fetching for:
    - First commits per PR (batchFetchFirstCommits)
    - First reviews per PR (batchFetchFirstReviews)
    - Deployments per project (batchFetchDeployments)
    
    Performance improvement:
    - Before: 3 queries per PR (30,001 queries for 10K PRs)
    - After: 3 batch queries total (99.99% reduction)
    
    Also fixes NULL author_id handling in review queries to properly
    handle PRs with empty author_id field.
    
    Tested with E2E tests confirming correctness and performance gains.
    
    Signed-off-by: Ankesh <[email protected]>
    
    * fix(dora): remove obsolete single-query functions replaced by batch 
queries
    
    Signed-off-by: Ankesh <[email protected]>
    
    * ci: add workflow to build and push amd64 image to GHCR
    
    Signed-off-by: Ankesh <[email protected]>
    
    * revert: remove GHCR build workflow
    
    Signed-off-by: Ankesh <[email protected]>
    
    ---------
    
    Signed-off-by: Ankesh <[email protected]>
---
 .../dora/tasks/change_lead_time_calculator.go      | 237 ++++++++++++++-------
 1 file changed, 157 insertions(+), 80 deletions(-)

diff --git a/backend/plugins/dora/tasks/change_lead_time_calculator.go 
b/backend/plugins/dora/tasks/change_lead_time_calculator.go
index d559ac39a..47749b1af 100644
--- a/backend/plugins/dora/tasks/change_lead_time_calculator.go
+++ b/backend/plugins/dora/tasks/change_lead_time_calculator.go
@@ -52,6 +52,31 @@ func CalculateChangeLeadTime(taskCtx plugin.SubTaskContext) 
errors.Error {
                return errors.Default.Wrap(err, "error deleting previous 
project_pr_metrics")
        }
 
+       // Batch fetch all required data upfront for better performance
+       startTime := time.Now()
+       logger.Info("Batch fetching data for project: %s", 
data.Options.ProjectName)
+
+       firstCommitsMap, err := 
batchFetchFirstCommits(data.Options.ProjectName, db)
+       if err != nil {
+               return errors.Default.Wrap(err, "failed to batch fetch first 
commits")
+       }
+       logger.Info("Fetched %d first commits in %v", len(firstCommitsMap), 
time.Since(startTime))
+
+       reviewStartTime := time.Now()
+       firstReviewsMap, err := 
batchFetchFirstReviews(data.Options.ProjectName, db)
+       if err != nil {
+               return errors.Default.Wrap(err, "failed to batch fetch first 
reviews")
+       }
+       logger.Info("Fetched %d first reviews in %v", len(firstReviewsMap), 
time.Since(reviewStartTime))
+
+       deploymentStartTime := time.Now()
+       deploymentsMap, err := batchFetchDeployments(data.Options.ProjectName, 
db)
+       if err != nil {
+               return errors.Default.Wrap(err, "failed to batch fetch 
deployments")
+       }
+       logger.Info("Fetched %d deployments in %v", len(deploymentsMap), 
time.Since(deploymentStartTime))
+       logger.Info("Total batch fetch time: %v", time.Since(startTime))
+
        // Get pull requests by repo project_name
        var clauses = []dal.Clause{
                dal.Select("pr.id, pr.pull_request_key, pr.author_id, 
pr.merge_commit_sha, pr.created_date, pr.merged_date"),
@@ -84,11 +109,8 @@ func CalculateChangeLeadTime(taskCtx plugin.SubTaskContext) 
errors.Error {
                        projectPrMetric.Id = pr.Id
                        projectPrMetric.ProjectName = data.Options.ProjectName
 
-                       // Get the first commit for the PR
-                       firstCommit, err := getFirstCommit(pr.Id, db)
-                       if err != nil {
-                               return nil, err
-                       }
+                       // Get the first commit for the PR from batch-fetched 
map
+                       firstCommit := firstCommitsMap[pr.Id]
                        // Calculate PR coding time
                        if firstCommit != nil {
                                projectPrMetric.PrCodingTime = 
computeTimeSpan(&firstCommit.CommitAuthoredDate, &pr.CreatedDate)
@@ -96,11 +118,8 @@ func CalculateChangeLeadTime(taskCtx plugin.SubTaskContext) 
errors.Error {
                                projectPrMetric.FirstCommitAuthoredDate = 
&firstCommit.CommitAuthoredDate
                        }
 
-                       // Get the first review for the PR
-                       firstReview, err := getFirstReview(pr.Id, pr.AuthorId, 
db)
-                       if err != nil {
-                               return nil, err
-                       }
+                       // Get the first review for the PR from batch-fetched 
map
+                       firstReview := firstReviewsMap[pr.Id]
                        // Calculate PR pickup time and PR review time
                        prDuring := computeTimeSpan(&pr.CreatedDate, 
pr.MergedDate)
                        if firstReview != nil {
@@ -113,11 +132,8 @@ func CalculateChangeLeadTime(taskCtx 
plugin.SubTaskContext) errors.Error {
                        projectPrMetric.PrCreatedDate = &pr.CreatedDate
                        projectPrMetric.PrMergedDate = pr.MergedDate
 
-                       // Get the deployment for the PR
-                       deployment, err := 
getDeploymentCommit(pr.MergeCommitSha, data.Options.ProjectName, db)
-                       if err != nil {
-                               return nil, err
-                       }
+                       // Get the deployment for the PR from batch-fetched map
+                       deployment := deploymentsMap[pr.MergeCommitSha]
 
                        // Calculate PR deploy time
                        if deployment != nil && deployment.FinishedDate != nil {
@@ -152,95 +168,156 @@ func CalculateChangeLeadTime(taskCtx 
plugin.SubTaskContext) errors.Error {
        return converter.Execute()
 }
 
-// getFirstCommit takes a PR ID and a database connection as input, and 
returns the first commit of the PR.
-func getFirstCommit(prId string, db dal.Dal) (*code.PullRequestCommit, 
errors.Error) {
-       // Initialize a pull_request_commit object
-       commit := &code.PullRequestCommit{}
-       // Define the SQL clauses for the database query
-       commitClauses := []dal.Clause{
-               dal.From(&code.PullRequestCommit{}),                          
// Select from the "pull_request_commits" table
-               dal.Where("pull_request_commits.pull_request_id = ?", prId),  
// Filter by the PR ID
-               dal.Orderby("pull_request_commits.commit_authored_date ASC"), 
// Order by the authored date of the commits (ascending)
+func computeTimeSpan(start, end *time.Time) *int64 {
+       if start == nil || end == nil {
+               return nil
+       }
+       span := end.Sub(*start)
+       minutes := int64(math.Ceil(span.Minutes()))
+       if minutes < 0 {
+               return nil
        }
+       return &minutes
+}
+
+// deploymentCommitWithMergeSha is a helper struct to capture both the 
deployment commit
+// and the associated merge_sha from the commits_diffs join query.
+type deploymentCommitWithMergeSha struct {
+       devops.CicdDeploymentCommit
+       MergeSha string `gorm:"column:merge_sha"`
+}
 
-       // Execute the query and retrieve the first commit
-       err := db.First(commit, commitClauses...)
+// batchFetchFirstCommits retrieves the first commit for all pull requests in 
the given project.
+// Returns a map indexed by PR ID for O(1) lookup performance.
+//
+// The query uses a subquery to find the minimum commit_authored_date for each 
PR,
+// then joins back to get the full commit record. This is more efficient than
+// fetching all commits and filtering in memory.
+func batchFetchFirstCommits(projectName string, db dal.Dal) 
(map[string]*code.PullRequestCommit, errors.Error) {
+       var results []*code.PullRequestCommit
+
+       // Use a subquery to find the earliest commit for each PR, then join to 
get full commit details.
+       // This avoids scanning all commits and is optimized by the database 
engine.
+       err := db.All(
+               &results,
+               dal.Select("prc.*"),
+               dal.From("pull_request_commits prc"),
+               dal.Join(`INNER JOIN (
+                       SELECT pull_request_id, MIN(commit_authored_date) as 
min_date
+                       FROM pull_request_commits
+                       GROUP BY pull_request_id
+               ) first_commits ON prc.pull_request_id = 
first_commits.pull_request_id
+               AND prc.commit_authored_date = first_commits.min_date`),
+               dal.Join("INNER JOIN pull_requests pr ON pr.id = 
prc.pull_request_id"),
+               dal.Join("LEFT JOIN project_mapping pm ON pm.row_id = 
pr.base_repo_id AND pm.table = 'repos'"),
+               dal.Where("pm.project_name = ?", projectName),
+               dal.Orderby("prc.pull_request_id, prc.commit_authored_date 
ASC"),
+       )
 
-       // If any other error occurred, return nil and the error
        if err != nil {
-               // If the error indicates that no commit was found, return nil 
and no error
-               if db.IsErrorNotFound(err) {
-                       return nil, nil
+               return nil, errors.Default.Wrap(err, "failed to batch fetch 
first commits")
+       }
+
+       // Build the map for O(1) lookup by PR ID
+       commitMap := make(map[string]*code.PullRequestCommit, len(results))
+       for _, commit := range results {
+               // Only keep the first commit if multiple commits have the same 
timestamp
+               if _, exists := commitMap[commit.PullRequestId]; !exists {
+                       commitMap[commit.PullRequestId] = commit
                }
-               return nil, err
        }
 
-       // If there were no errors, return the first commit and no error
-       return commit, nil
+       return commitMap, nil
 }
 
-// getFirstReview takes a PR ID, PR creator ID, and a database connection as 
input, and returns the first review comment of the PR.
-func getFirstReview(prId string, prCreator string, db dal.Dal) 
(*code.PullRequestComment, errors.Error) {
-       // Initialize a review comment object
-       review := &code.PullRequestComment{}
-       // Define the SQL clauses for the database query
-       commentClauses := []dal.Clause{
-               dal.From(&code.PullRequestComment{}),                           
       // Select from the "pull_request_comments" table
-               dal.Where("pull_request_id = ? and account_id != ?", prId, 
prCreator), // Filter by the PR ID and exclude comments from the PR creator
-               dal.Orderby("created_date ASC"),                                
       // Order by the created date of the review comments (ascending)
-       }
+// batchFetchFirstReviews retrieves the first review comment for all pull 
requests in the given project.
+// Returns a map indexed by PR ID for O(1) lookup performance.
+//
+// The query uses a subquery to find the minimum created_date for each PR 
(excluding the PR author),
+// then joins back to get the full comment record.
+func batchFetchFirstReviews(projectName string, db dal.Dal) 
(map[string]*code.PullRequestComment, errors.Error) {
+       var results []*code.PullRequestComment
 
-       // Execute the query and retrieve the first review comment
-       err := db.First(review, commentClauses...)
+       // Use a subquery to find the earliest review comment for each PR 
(excluding author's comments),
+       // then join to get full comment details.
+       err := db.All(
+               &results,
+               dal.Select("prc.*"),
+               dal.From("pull_request_comments prc"),
+               dal.Join(`INNER JOIN (
+                       SELECT prc2.pull_request_id, MIN(prc2.created_date) as 
min_date
+                       FROM pull_request_comments prc2
+                       INNER JOIN pull_requests pr2 ON pr2.id = 
prc2.pull_request_id
+                       WHERE (pr2.author_id IS NULL OR pr2.author_id = '' OR 
prc2.account_id != pr2.author_id)
+                       GROUP BY prc2.pull_request_id
+               ) first_reviews ON prc.pull_request_id = 
first_reviews.pull_request_id
+               AND prc.created_date = first_reviews.min_date`),
+               dal.Join("INNER JOIN pull_requests pr ON pr.id = 
prc.pull_request_id"),
+               dal.Join("LEFT JOIN project_mapping pm ON pm.row_id = 
pr.base_repo_id AND pm.table = 'repos'"),
+               dal.Where("pm.project_name = ? AND (pr.author_id IS NULL OR 
pr.author_id = '' OR prc.account_id != pr.author_id)", projectName),
+               dal.Orderby("prc.pull_request_id, prc.created_date ASC"),
+       )
 
-       // If any other error occurred, return nil and the error
        if err != nil {
-               // If the error indicates that no review comment was found, 
return nil and no error
-               if db.IsErrorNotFound(err) {
-                       return nil, nil
+               return nil, errors.Default.Wrap(err, "failed to batch fetch 
first reviews")
+       }
+
+       // Build the map for O(1) lookup by PR ID
+       reviewMap := make(map[string]*code.PullRequestComment, len(results))
+       for _, review := range results {
+               // Only keep the first review if multiple reviews have the same 
timestamp
+               if _, exists := reviewMap[review.PullRequestId]; !exists {
+                       reviewMap[review.PullRequestId] = review
                }
-               return nil, err
        }
 
-       // If there were no errors, return the first review comment and no error
-       return review, nil
+       return reviewMap, nil
 }
 
-// getDeploymentCommit takes a merge commit SHA, a repository ID, a list of 
deployment pairs, and a database connection as input.
-// It returns the deployment pair related to the merge commit, or nil if not 
found.
-func getDeploymentCommit(mergeSha string, projectName string, db dal.Dal) 
(*devops.CicdDeploymentCommit, errors.Error) {
-       deploymentCommits := make([]*devops.CicdDeploymentCommit, 0, 1)
-       // do not use `.First` method since gorm would append ORDER BY ID to 
the query which leads to a error
+// batchFetchDeployments retrieves deployment commits for all merge commits in 
the given project.
+// Returns a map indexed by merge commit SHA for O(1) lookup performance.
+//
+// The query finds the first successful production deployment for each merge 
commit by:
+// 1. Finding deployment commits that have a previous successful deployment
+// 2. Joining with commits_diffs to find which deployment included each merge 
commit
+// 3. Filtering for successful production deployments
+// 4. Ordering by started_date to get the earliest deployment
+//
+// The map is indexed by merge_sha (from commits_diffs), not by deployment 
commit_sha,
+// because the caller needs to look up deployments by PR merge_commit_sha.
+func batchFetchDeployments(projectName string, db dal.Dal) 
(map[string]*devops.CicdDeploymentCommit, errors.Error) {
+       var results []*deploymentCommitWithMergeSha
+
+       // Query finds the first deployment for each merge commit by using a 
window function
+       // to rank deployments by started_date, then filtering to keep only 
rank 1.
        err := db.All(
-               &deploymentCommits,
-               dal.Select("dc.*"),
+               &results,
+               dal.Select("dc.*, cd.commit_sha as merge_sha"),
                dal.From("cicd_deployment_commits dc"),
-               dal.Join("LEFT JOIN cicd_deployment_commits p ON 
(dc.prev_success_deployment_commit_id = p.id)"),
-               dal.Join("LEFT JOIN project_mapping pm ON (pm.table = 
'cicd_scopes' AND pm.row_id = dc.cicd_scope_id)"),
-               dal.Join("INNER JOIN commits_diffs cd ON (cd.new_commit_sha = 
dc.commit_sha AND cd.old_commit_sha = COALESCE (p.commit_sha, ''))"),
+               dal.Join("LEFT JOIN cicd_deployment_commits p ON 
dc.prev_success_deployment_commit_id = p.id"),
+               dal.Join("INNER JOIN commits_diffs cd ON cd.new_commit_sha = 
dc.commit_sha AND cd.old_commit_sha = COALESCE(p.commit_sha, '')"),
+               dal.Join("LEFT JOIN project_mapping pm ON pm.table = 
'cicd_scopes' AND pm.row_id = dc.cicd_scope_id"),
                dal.Where("dc.prev_success_deployment_commit_id <> ''"),
                dal.Where("dc.environment = 'PRODUCTION'"), // TODO: remove 
this when multi-environment is supported
-               dal.Where("pm.project_name = ? AND cd.commit_sha = ? AND 
dc.RESULT = ?", projectName, mergeSha, devops.RESULT_SUCCESS),
-               dal.Orderby("dc.started_date, dc.id ASC"),
-               dal.Limit(1),
+               dal.Where("dc.result = ? AND pm.project_name = ?", 
devops.RESULT_SUCCESS, projectName),
+               dal.Orderby("cd.commit_sha, dc.started_date ASC, dc.id ASC"),
        )
+
        if err != nil {
-               return nil, err
-       }
-       if len(deploymentCommits) == 0 {
-               return nil, nil
+               return nil, errors.Default.Wrap(err, "failed to batch fetch 
deployments")
        }
-       return deploymentCommits[0], nil
-}
 
-func computeTimeSpan(start, end *time.Time) *int64 {
-       if start == nil || end == nil {
-               return nil
-       }
-       span := end.Sub(*start)
-       minutes := int64(math.Ceil(span.Minutes()))
-       if minutes < 0 {
-               return nil
+       // Build the map indexed by merge_sha for O(1) lookup.
+       // Keep only the first deployment for each merge commit (earliest by 
started_date).
+       deploymentMap := make(map[string]*devops.CicdDeploymentCommit, 
len(results))
+       for _, result := range results {
+               // Only keep the first deployment for each merge_sha
+               if _, exists := deploymentMap[result.MergeSha]; !exists {
+                       // Copy the CicdDeploymentCommit without the MergeSha 
field
+                       deploymentCopy := result.CicdDeploymentCommit
+                       deploymentMap[result.MergeSha] = &deploymentCopy
+               }
        }
-       return &minutes
+
+       return deploymentMap, nil
 }

Reply via email to