dosubot[bot] commented on issue #8098: URL: https://github.com/apache/incubator-devlake/issues/8098#issuecomment-2371316911
<!-- Greeting --> Hey there, @msarzijr! I'm here to help you with any bugs, questions, or contribution guidance you might need. Let's tackle this together! <!-- Answer --> I found a similar closed issue: [Feature][framework] Throttling total number of pipelines running in parallel, which was resolved by a pull request that implemented running pipelines in an independent process <sup>[[1]](https://github.com/apache/incubator-devlake/issues/2473)</sup>. To address the issue of the pipeline getting stuck when the disk space is full and ensuring the cancel pipeline command stops the pipeline and cleans the `/tmp` folder, you can implement the following steps: 1. **Monitor Disk Space**: Implement a function to monitor disk space and pause the pipeline if the disk space is below a certain threshold. 2. **Clean `/tmp` Folder on Cancel**: Modify the `CancelPipeline` function to clean the `/tmp` folder when a pipeline is canceled. Here is an example implementation: ### Monitor Disk Space Add a function to check disk space and pause the pipeline if necessary: ```go import ( "syscall" "log" ) func checkDiskSpace(path string, threshold uint64) bool { var stat syscall.Statfs_t err := syscall.Statfs(path, &stat) if err != nil { log.Printf("Failed to get disk space: %v", err) return false } // Available blocks * size per block = available space in bytes available := stat.Bavail * uint64(stat.Bsize) return available > threshold } func monitorDiskSpace() { const threshold = 10 * 1024 * 1024 * 1024 // 10 GB for { if !checkDiskSpace("/", threshold) { log.Println("Disk space is below threshold, pausing pipeline") // Pause the pipeline // Implement the logic to pause the pipeline } time.Sleep(1 * time.Minute) // Check every minute } } ``` ### Clean `/tmp` Folder on Cancel Modify the `CancelPipeline` function to clean the `/tmp` folder: ```go import ( "os" "path/filepath" ) func cleanTmpFolder() error { tmpDir := "/tmp" err := os.RemoveAll(tmpDir) if err != nil { return err } return os.Mkdir(tmpDir, 0755) } func CancelPipeline(pipelineId uint64) errors.Error { // prevent RunPipelineInQueue from consuming pending pipelines pipeline := &models.Pipeline{} err := db.First(pipeline, dal.Where("id = ?", pipelineId)) if err != nil { return errors.BadInput.New("pipeline not found") } if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN { pipeline.Status = models.TASK_CANCELLED err = db.Update(pipeline) if err != nil { return errors.Default.Wrap(err, "failed to update pipeline") } // now, with RunPipelineInQueue being block and target pipeline got updated // we should update the related tasks as well err = db.UpdateColumn( &models.Task{}, "status", models.TASK_CANCELLED, dal.Where("pipeline_id = ?", pipelineId), ) if err != nil { return errors.Default.Wrap(err, "failed to update pipeline tasks") } // Clean the /tmp folder if err := cleanTmpFolder(); err != nil { return errors.Default.Wrap(err, "failed to clean /tmp folder") } // the target pipeline is pending, no running, no need to perform the actual cancel operation return nil } pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, Pending: 1, Pagination: Pagination{PageSize: -1}}) if err != nil { return errors.Convert(err) } if count == 0 { return nil } for _, pendingTask := range pendingTasks { _ = CancelTask(pendingTask.ID) } // Clean the /tmp folder if err := cleanTmpFolder(); err != nil { return errors.Default.Wrap(err, "failed to clean /tmp folder") } return errors.Convert(err) } ``` ### Run Disk Space Monitor Start the disk space monitor in the `pipelineServiceInit` function: ```go func pipelineServiceInit() { // initialize plugin plugin.InitPlugins(basicRes) // notification var notificationEndpoint = cfg.GetString("NOTIFICATION_ENDPOINT") var notificationSecret = cfg.GetString("NOTIFICATION_SECRET") if strings.TrimSpace(notificationEndpoint) != "" { defaultNotificationService = NewDefaultPipelineNotificationService(notificationEndpoint, notificationSecret) } // standalone mode: reset pipeline status if cfg.GetBool("RESUME_PIPELINES") { markInterruptedPipelineAs(models.TASK_RESUME) } else { markInterruptedPipelineAs(models.TASK_FAILED) } // load cronjobs for blueprints errors.Must(ReloadBlueprints()) var pipelineMaxParallel = cfg.GetInt64("PIPELINE_MAX_PARALLEL") if pipelineMaxParallel < 0 { panic(errors.BadInput.New(`PIPELINE_MAX_PARALLEL should be a positive integer`)) } if pipelineMaxParallel == 0 { globalPipelineLog.Warn(nil, `pipelineMaxParallel=0 means pipeline will be run No Limit`) pipelineMaxParallel = 10000 } // run pipeline with independent goroutine go RunPipelineInQueue(pipelineMaxParallel) // Start disk space monitor go monitorDiskSpace() } ``` This implementation ensures that the pipeline is paused when disk space is low and the `/tmp` folder is cleaned when a pipeline is canceled <sup>[[2]](https://github.com/apache/incubator-devlake/blob/main/backend/server/services/pipeline.go)</sup>. <!-- Dosu Comment Footer --> *To continue talking to [Dosu](https://dosu.dev), mention @dosu.* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@devlake.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org