dosubot[bot] commented on issue #8098:
URL: 
https://github.com/apache/incubator-devlake/issues/8098#issuecomment-2371316911

   <!-- Greeting -->
   Hey there, @msarzijr! I'm here to help you with any bugs, questions, or 
contribution guidance you might need. Let's tackle this together!
   
   <!-- Answer -->
   I found a similar closed issue: [Feature][framework] Throttling total number 
of pipelines running in parallel, which was resolved by a pull request that 
implemented running pipelines in an independent process 
<sup>[[1]](https://github.com/apache/incubator-devlake/issues/2473)</sup>.
   
   To address the issue of the pipeline getting stuck when the disk space is 
full and ensuring the cancel pipeline command stops the pipeline and cleans the 
`/tmp` folder, you can implement the following steps:
   
   1. **Monitor Disk Space**: Implement a function to monitor disk space and 
pause the pipeline if the disk space is below a certain threshold.
   2. **Clean `/tmp` Folder on Cancel**: Modify the `CancelPipeline` function 
to clean the `/tmp` folder when a pipeline is canceled.
   
   Here is an example implementation:
   
   ### Monitor Disk Space
   
   Add a function to check disk space and pause the pipeline if necessary:
   
   ```go
   import (
        "syscall"
        "log"
   )
   
   func checkDiskSpace(path string, threshold uint64) bool {
        var stat syscall.Statfs_t
        err := syscall.Statfs(path, &stat)
        if err != nil {
                log.Printf("Failed to get disk space: %v", err)
                return false
        }
        // Available blocks * size per block = available space in bytes
        available := stat.Bavail * uint64(stat.Bsize)
        return available > threshold
   }
   
   func monitorDiskSpace() {
        const threshold = 10 * 1024 * 1024 * 1024 // 10 GB
        for {
                if !checkDiskSpace("/", threshold) {
                        log.Println("Disk space is below threshold, pausing 
pipeline")
                        // Pause the pipeline
                        // Implement the logic to pause the pipeline
                }
                time.Sleep(1 * time.Minute) // Check every minute
        }
   }
   ```
   
   ### Clean `/tmp` Folder on Cancel
   
   Modify the `CancelPipeline` function to clean the `/tmp` folder:
   
   ```go
   import (
        "os"
        "path/filepath"
   )
   
   func cleanTmpFolder() error {
        tmpDir := "/tmp"
        err := os.RemoveAll(tmpDir)
        if err != nil {
                return err
        }
        return os.Mkdir(tmpDir, 0755)
   }
   
   func CancelPipeline(pipelineId uint64) errors.Error {
        // prevent RunPipelineInQueue from consuming pending pipelines
        pipeline := &models.Pipeline{}
        err := db.First(pipeline, dal.Where("id = ?", pipelineId))
        if err != nil {
                return errors.BadInput.New("pipeline not found")
        }
        if pipeline.Status == models.TASK_CREATED || pipeline.Status == 
models.TASK_RERUN {
                pipeline.Status = models.TASK_CANCELLED
                err = db.Update(pipeline)
                if err != nil {
                        return errors.Default.Wrap(err, "failed to update 
pipeline")
                }
                // now, with RunPipelineInQueue being block and target pipeline 
got updated
                // we should update the related tasks as well
                err = db.UpdateColumn(
                        &models.Task{},
                        "status", models.TASK_CANCELLED,
                        dal.Where("pipeline_id = ?", pipelineId),
                )
                if err != nil {
                        return errors.Default.Wrap(err, "failed to update 
pipeline tasks")
                }
                // Clean the /tmp folder
                if err := cleanTmpFolder(); err != nil {
                        return errors.Default.Wrap(err, "failed to clean /tmp 
folder")
                }
                // the target pipeline is pending, no running, no need to 
perform the actual cancel operation
                return nil
        }
        pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, 
Pending: 1, Pagination: Pagination{PageSize: -1}})
        if err != nil {
                return errors.Convert(err)
        }
        if count == 0 {
                return nil
        }
        for _, pendingTask := range pendingTasks {
                _ = CancelTask(pendingTask.ID)
        }
        // Clean the /tmp folder
        if err := cleanTmpFolder(); err != nil {
                return errors.Default.Wrap(err, "failed to clean /tmp folder")
        }
        return errors.Convert(err)
   }
   ```
   
   ### Run Disk Space Monitor
   
   Start the disk space monitor in the `pipelineServiceInit` function:
   
   ```go
   func pipelineServiceInit() {
        // initialize plugin
        plugin.InitPlugins(basicRes)
   
        // notification
        var notificationEndpoint = cfg.GetString("NOTIFICATION_ENDPOINT")
        var notificationSecret = cfg.GetString("NOTIFICATION_SECRET")
        if strings.TrimSpace(notificationEndpoint) != "" {
                defaultNotificationService = 
NewDefaultPipelineNotificationService(notificationEndpoint, notificationSecret)
        }
   
        // standalone mode: reset pipeline status
        if cfg.GetBool("RESUME_PIPELINES") {
                markInterruptedPipelineAs(models.TASK_RESUME)
        } else {
                markInterruptedPipelineAs(models.TASK_FAILED)
        }
   
        // load cronjobs for blueprints
        errors.Must(ReloadBlueprints())
   
        var pipelineMaxParallel = cfg.GetInt64("PIPELINE_MAX_PARALLEL")
        if pipelineMaxParallel < 0 {
                panic(errors.BadInput.New(`PIPELINE_MAX_PARALLEL should be a 
positive integer`))
        }
        if pipelineMaxParallel == 0 {
                globalPipelineLog.Warn(nil, `pipelineMaxParallel=0 means 
pipeline will be run No Limit`)
                pipelineMaxParallel = 10000
        }
        // run pipeline with independent goroutine
        go RunPipelineInQueue(pipelineMaxParallel)
   
        // Start disk space monitor
        go monitorDiskSpace()
   }
   ```
   
   This implementation ensures that the pipeline is paused when disk space is 
low and the `/tmp` folder is cleaned when a pipeline is canceled 
<sup>[[2]](https://github.com/apache/incubator-devlake/blob/main/backend/server/services/pipeline.go)</sup>.
   
   
   <!-- Dosu Comment Footer -->
   *To continue talking to [Dosu](https://dosu.dev), mention @dosu.*
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@devlake.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to