Hi Robert, It is in my first post in this thread. Basically, I want to know 
why all my logical processors are not being used  in my program. Thanks.

On Thursday, 18 June 2020 07:24:40 UTC+10, Robert Engels wrote:
>
> What is the question?
>
> On Jun 17, 2020, at 4:06 PM, envee <neeraj....@gmail.com <javascript:>> 
> wrote:
>
> Hi, Is anyone able to help me here ?
> Here is a (simplified) snippet of the code, in case it helps answering my 
> query. I basically create a goroutine for every input file (assume max 8) 
> and then wait for processing of all files to finish. Each goroutine 
> processes a line within the file and then any records which match a certain 
> criteria are appended to a slice. After all lines have been processed in a 
> file, the list is Sent to a channel. Finally, in the Closer goroutine, I 
> wait for all goroutines to finish and close the channel once all goroutines 
> have finished :
>
> package main
>
> import (
> "bufio"
> "compress/gzip"
> "flag"
> "fmt"
> "log"
> "os"
> "path/filepath"
> "strings"
> "sync"
> "github.com/en-vee/alog"
> )
>
> const (
> inputFilePrefix = "subscriber_db_"
> )
>
> var (
> inputDir              string
> )
>
> type QuarantineObject struct {
> objectType string
> id         string
> }
>
> func init() {
> flag.StringVar(&inputDir, "d", "", "Path to the Input folder which is to 
> be analysed")
> }
>
> func main() {
>
> var err error
> alog.SetLogLevel(alog.TRACE)
> flag.Parse()
>
> // Validation of input parameters
> if inputDir == "" {
> fmt.Fprintf(os.Stderr, "No Input Directory Specified\n")
> flag.Usage()
> os.Exit(1)
> }
>
> // Is the input directory valid ?
> if _, err := os.Stat(inputDir); os.IsNotExist(err) {
> fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", inputDir)
> flag.Usage()
> os.Exit(1)
> }
>
> // Determine all subscriber files by matching on the subscriber files 
> prefix
>
> inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", 
> inputDir, inputFilePrefix))
> if err != nil {
> fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err)
> os.Exit(1)
> }
>
> // Loop through all subscriber files
> // Make a goroutine for processing each file
> // Create a channel to receive the quarantined objects
> qObjChannel := make(chan []QuarantineObject, len(inputFileNames))
>
> //runtime.GOMAXPROCS(len(inputFileNames))
> var wg sync.WaitGroup
> for _, inputFileGz := range inputFileNames {
> wg.Add(1)
> go func(inputFileGz string) {
> nRecords := 0
>
> qObjList := make([]QuarantineObject, 0, 0)
> defer wg.Done()
> defer func() {
> alog.Trace("Finished Processing File : %s. Total Records Analysed : %d\n", 
> inputFileGz, nRecords)
> }()
> // Open the file as a GZIP stream
>
> alog.Trace("==================================================================================================================================")
> alog.Trace("Processing Input File : %s", inputFileGz)
>
> alog.Trace("==================================================================================================================================")
>
> f, err := os.Open(inputFileGz)
> if err != nil {
> fmt.Fprintf(os.Stderr, "Error opening file : %v\n", err)
> return
> }
> defer f.Close()
>
> fgz, err := gzip.NewReader(f)
> if err != nil {
> fmt.Fprintf(os.Stderr, "Error creating GZIP reader : %v\n", err)
> return
> }
> defer fgz.Close()
>
> scanner := bufio.NewScanner(fgz)
>
> // Iterate over all lines of the file and decode
>
> for scanner.Scan() {
> qObject := decodeLine()
> if qObject.IsQuarantined() {
> qObjList = append(qObjList, qObject)
> }
> }
> ///////////////////////////////////////////////////////
> // After all lines have been processed, Send to Channel
> ///////////////////////////////////////////////////////
> qObjChannel <- qObjList
> }(inputFileGz)
>
> }
>
> fmt.Println("Waiting for processing of all files to finish")
> ///////////////////////////////////////////////////////
> // Closer GoRoutine
> ///////////////////////////////////////////////////////
> go func() {
> wg.Wait()
> close(qObjChannel)
> fmt.Println("Quarantined Objects List")
> fmt.Println("------------------------")
> }()
>
> qFound := false
>
> for qObjList := range qObjChannel {
> for _, qObj := range qObjList {
> fmt.Println(qObj.id, "--->", qObj.objectType)
> qFound = true
> }
> }
>
> }
>
>
>
> On Monday, 15 June 2020 23:29:06 UTC+10, envee wrote:
>>
>> I am running a program which reads multiple gzipped input files and 
>> performs some processing on each line of the file. 
>> It creates 8 goroutines (1 per input file which is to be processed. the 
>> number of such files can be thought to remain 8 at the max).
>> Each of the go routines send to a buffered channel after finishing 
>> processing of their respective file.
>> After creating the go routines, the program waits (using WaitGroup) for 
>> all go routines to finish and also drain the channel for all the values 
>> sent by the go routines.
>>
>> I have an 4 core CPU with 2 threads per core = 8 logical cores.
>>
>> But I set GOMAXPROCS=4
>>
>> When I run the program with scheduler trace interval set to 1000ms, I can 
>> see the following :
>>
>> SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
>> idlethreads=0 runqueue=0 [0 0 0 1]
>> SCHED 2008ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
>> idlethreads=1 runqueue=0 [1 0 5 0]
>> SCHED 3015ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
>> idlethreads=1 runqueue=1 [0 0 1 0]
>> SCHED 4022ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 
>> idlethreads=2 runqueue=0 [0 0 0 0]
>> SCHED 5029ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 
>> idlethreads=2 runqueue=1 [0 0 0 4]
>>
>>
>> If I create 8 go routines, shouldn't they all be distributed equally 
>> among the 4 logical cores ?
>>
>> Why do some runqueues of the logical cores show values of 4 or 5 and some 
>> have values of 0 ?
>>
>> I was hoping to see something like which I according to my understanding 
>> means that all 4 processors have 1 go routine each waiting in the local 
>> runqueue and at the same time has 1 go routine running on the assigned OS 
>> Thread :
>>
>> SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
>> idlethreads=0 runqueue=0 [1 1 1 1]
>>
>> Thanks.
>>
> -- 
> You received this message because you are subscribed to the Google Groups 
> "golang-nuts" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to golan...@googlegroups.com <javascript:>.
> To view this discussion on the web visit 
> https://groups.google.com/d/msgid/golang-nuts/36efa087-d66c-4d7e-b5b2-de1d4d3ea339o%40googlegroups.com
>  
> <https://groups.google.com/d/msgid/golang-nuts/36efa087-d66c-4d7e-b5b2-de1d4d3ea339o%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/golang-nuts/e297ab2e-898a-4f95-b923-e34859ebcbeao%40googlegroups.com.

Reply via email to