What is the question?

> On Jun 17, 2020, at 4:06 PM, envee <neeraj.vaidy...@gmail.com> wrote:
> 
> Hi, Is anyone able to help me here ?
> Here is a (simplified) snippet of the code, in case it helps answering my 
> query. I basically create a goroutine for every input file (assume max 8) and 
> then wait for processing of all files to finish. Each goroutine processes a 
> line within the file and then any records which match a certain criteria are 
> appended to a slice. After all lines have been processed in a file, the list 
> is Sent to a channel. Finally, in the Closer goroutine, I wait for all 
> goroutines to finish and close the channel once all goroutines have finished :
> 
> package main
> 
> import (
>       "bufio"
>       "compress/gzip"
>       "flag"
>       "fmt"
>       "log"
>       "os"
>       "path/filepath"
>       "strings"
>       "sync"
>       "github.com/en-vee/alog"
> )
> 
> const (
>       inputFilePrefix = "subscriber_db_"
> )
> 
> var (
>       inputDir              string
> )
> 
> type QuarantineObject struct {
>       objectType string
>       id         string
> }
> 
> func init() {
>       flag.StringVar(&inputDir, "d", "", "Path to the Input folder which is 
> to be analysed")
> }
> 
> func main() {
> 
>       var err error
>       alog.SetLogLevel(alog.TRACE)
>       flag.Parse()
> 
>       // Validation of input parameters
>       if inputDir == "" {
>               fmt.Fprintf(os.Stderr, "No Input Directory Specified\n")
>               flag.Usage()
>               os.Exit(1)
>       }
> 
>       // Is the input directory valid ?
>       if _, err := os.Stat(inputDir); os.IsNotExist(err) {
>               fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", 
> inputDir)
>               flag.Usage()
>               os.Exit(1)
>       }
> 
>       // Determine all subscriber files by matching on the subscriber files 
> prefix
> 
>       inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", 
> inputDir, inputFilePrefix))
>       if err != nil {
>               fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err)
>               os.Exit(1)
>       }
> 
>       // Loop through all subscriber files
>       // Make a goroutine for processing each file
>       // Create a channel to receive the quarantined objects
>       qObjChannel := make(chan []QuarantineObject, len(inputFileNames))
> 
>       //runtime.GOMAXPROCS(len(inputFileNames))
>       var wg sync.WaitGroup
>       for _, inputFileGz := range inputFileNames {
>               wg.Add(1)
>               go func(inputFileGz string) {
>                       nRecords := 0
> 
>                       qObjList := make([]QuarantineObject, 0, 0)
>                       defer wg.Done()
>                       defer func() {
>                               alog.Trace("Finished Processing File : %s. 
> Total Records Analysed : %d\n", inputFileGz, nRecords)
>                       }()
>                       // Open the file as a GZIP stream
>                       
> alog.Trace("==================================================================================================================================")
>                       alog.Trace("Processing Input File : %s", inputFileGz)
>                       
> alog.Trace("==================================================================================================================================")
> 
>                       f, err := os.Open(inputFileGz)
>                       if err != nil {
>                               fmt.Fprintf(os.Stderr, "Error opening file : 
> %v\n", err)
>                               return
>                       }
>                       defer f.Close()
> 
>                       fgz, err := gzip.NewReader(f)
>                       if err != nil {
>                               fmt.Fprintf(os.Stderr, "Error creating GZIP 
> reader : %v\n", err)
>                               return
>                       }
>                       defer fgz.Close()
> 
>                       scanner := bufio.NewScanner(fgz)
> 
>                       // Iterate over all lines of the file and decode
> 
>                       for scanner.Scan() {
>                               qObject := decodeLine()
>                               if qObject.IsQuarantined() {
>                                       qObjList = append(qObjList, qObject)
>                               }
>                       }
>                       ///////////////////////////////////////////////////////
>                       // After all lines have been processed, Send to Channel
>                       ///////////////////////////////////////////////////////
>                       qObjChannel <- qObjList
>               }(inputFileGz)
> 
>       }
> 
>       
>       fmt.Println("Waiting for processing of all files to finish")
>       ///////////////////////////////////////////////////////
>       // Closer GoRoutine
>       ///////////////////////////////////////////////////////
>       go func() {
>               wg.Wait()
>               close(qObjChannel)
>               fmt.Println("Quarantined Objects List")
>               fmt.Println("------------------------")
>       }()
> 
>       
>       qFound := false
> 
>       for qObjList := range qObjChannel {
>               for _, qObj := range qObjList {
>                       fmt.Println(qObj.id, "--->", qObj.objectType)
>                       qFound = true
>               }
>       }
> 
> }
> 
> 
> 
>> On Monday, 15 June 2020 23:29:06 UTC+10, envee wrote:
>> I am running a program which reads multiple gzipped input files and performs 
>> some processing on each line of the file. 
>> It creates 8 goroutines (1 per input file which is to be processed. the 
>> number of such files can be thought to remain 8 at the max).
>> Each of the go routines send to a buffered channel after finishing 
>> processing of their respective file.
>> After creating the go routines, the program waits (using WaitGroup) for all 
>> go routines to finish and also drain the channel for all the values sent by 
>> the go routines.
>> 
>> I have an 4 core CPU with 2 threads per core = 8 logical cores.
>> 
>> But I set GOMAXPROCS=4
>> 
>> When I run the program with scheduler trace interval set to 1000ms, I can 
>> see the following :
>> 
>> SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
>> idlethreads=0 runqueue=0 [0 0 0 1]
>> SCHED 2008ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
>> idlethreads=1 runqueue=0 [1 0 5 0]
>> SCHED 3015ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
>> idlethreads=1 runqueue=1 [0 0 1 0]
>> SCHED 4022ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 
>> idlethreads=2 runqueue=0 [0 0 0 0]
>> SCHED 5029ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 
>> idlethreads=2 runqueue=1 [0 0 0 4]
>> 
>> 
>> If I create 8 go routines, shouldn't they all be distributed equally among 
>> the 4 logical cores ?
>> 
>> Why do some runqueues of the logical cores show values of 4 or 5 and some 
>> have values of 0 ?
>> 
>> I was hoping to see something like which I according to my understanding 
>> means that all 4 processors have 1 go routine each waiting in the local 
>> runqueue and at the same time has 1 go routine running on the assigned OS 
>> Thread :
>> 
>> SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
>> idlethreads=0 runqueue=0 [1 1 1 1]
>> 
>> Thanks.
> 
> -- 
> You received this message because you are subscribed to the Google Groups 
> "golang-nuts" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to golang-nuts+unsubscr...@googlegroups.com.
> To view this discussion on the web visit 
> https://groups.google.com/d/msgid/golang-nuts/36efa087-d66c-4d7e-b5b2-de1d4d3ea339o%40googlegroups.com.

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/golang-nuts/A89FE888-A764-4D55-9D5B-1B011AAF849E%40ix.netcom.com.

Reply via email to