Hi, Is anyone able to help me here ?
Here is a (simplified) snippet of the code, in case it helps answering my 
query. I basically create a goroutine for every input file (assume max 8) 
and then wait for processing of all files to finish. Each goroutine 
processes a line within the file and then any records which match a certain 
criteria are appended to a slice. After all lines have been processed in a 
file, the list is Sent to a channel. Finally, in the Closer goroutine, I 
wait for all goroutines to finish and close the channel once all goroutines 
have finished :

package main

import (
"bufio"
"compress/gzip"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
"github.com/en-vee/alog"
)

const (
inputFilePrefix = "subscriber_db_"
)

var (
inputDir              string
)

type QuarantineObject struct {
objectType string
id         string
}

func init() {
flag.StringVar(&inputDir, "d", "", "Path to the Input folder which is to be 
analysed")
}

func main() {

var err error
alog.SetLogLevel(alog.TRACE)
flag.Parse()

// Validation of input parameters
if inputDir == "" {
fmt.Fprintf(os.Stderr, "No Input Directory Specified\n")
flag.Usage()
os.Exit(1)
}

// Is the input directory valid ?
if _, err := os.Stat(inputDir); os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", inputDir)
flag.Usage()
os.Exit(1)
}

// Determine all subscriber files by matching on the subscriber files prefix

inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", inputDir, 
inputFilePrefix))
if err != nil {
fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err)
os.Exit(1)
}

// Loop through all subscriber files
// Make a goroutine for processing each file
// Create a channel to receive the quarantined objects
qObjChannel := make(chan []QuarantineObject, len(inputFileNames))

//runtime.GOMAXPROCS(len(inputFileNames))
var wg sync.WaitGroup
for _, inputFileGz := range inputFileNames {
wg.Add(1)
go func(inputFileGz string) {
nRecords := 0

qObjList := make([]QuarantineObject, 0, 0)
defer wg.Done()
defer func() {
alog.Trace("Finished Processing File : %s. Total Records Analysed : %d\n", 
inputFileGz, nRecords)
}()
// Open the file as a GZIP stream
alog.Trace("==================================================================================================================================")
alog.Trace("Processing Input File : %s", inputFileGz)
alog.Trace("==================================================================================================================================")

f, err := os.Open(inputFileGz)
if err != nil {
fmt.Fprintf(os.Stderr, "Error opening file : %v\n", err)
return
}
defer f.Close()

fgz, err := gzip.NewReader(f)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating GZIP reader : %v\n", err)
return
}
defer fgz.Close()

scanner := bufio.NewScanner(fgz)

// Iterate over all lines of the file and decode

for scanner.Scan() {
qObject := decodeLine()
if qObject.IsQuarantined() {
qObjList = append(qObjList, qObject)
}
}
///////////////////////////////////////////////////////
// After all lines have been processed, Send to Channel
///////////////////////////////////////////////////////
qObjChannel <- qObjList
}(inputFileGz)

}

fmt.Println("Waiting for processing of all files to finish")
///////////////////////////////////////////////////////
// Closer GoRoutine
///////////////////////////////////////////////////////
go func() {
wg.Wait()
close(qObjChannel)
fmt.Println("Quarantined Objects List")
fmt.Println("------------------------")
}()

qFound := false

for qObjList := range qObjChannel {
for _, qObj := range qObjList {
fmt.Println(qObj.id, "--->", qObj.objectType)
qFound = true
}
}

}



On Monday, 15 June 2020 23:29:06 UTC+10, envee wrote:
>
> I am running a program which reads multiple gzipped input files and 
> performs some processing on each line of the file. 
> It creates 8 goroutines (1 per input file which is to be processed. the 
> number of such files can be thought to remain 8 at the max).
> Each of the go routines send to a buffered channel after finishing 
> processing of their respective file.
> After creating the go routines, the program waits (using WaitGroup) for 
> all go routines to finish and also drain the channel for all the values 
> sent by the go routines.
>
> I have an 4 core CPU with 2 threads per core = 8 logical cores.
>
> But I set GOMAXPROCS=4
>
> When I run the program with scheduler trace interval set to 1000ms, I can 
> see the following :
>
> SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
> idlethreads=0 runqueue=0 [0 0 0 1]
> SCHED 2008ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
> idlethreads=1 runqueue=0 [1 0 5 0]
> SCHED 3015ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
> idlethreads=1 runqueue=1 [0 0 1 0]
> SCHED 4022ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 
> idlethreads=2 runqueue=0 [0 0 0 0]
> SCHED 5029ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 
> idlethreads=2 runqueue=1 [0 0 0 4]
>
>
> If I create 8 go routines, shouldn't they all be distributed equally among 
> the 4 logical cores ?
>
> Why do some runqueues of the logical cores show values of 4 or 5 and some 
> have values of 0 ?
>
> I was hoping to see something like which I according to my understanding 
> means that all 4 processors have 1 go routine each waiting in the local 
> runqueue and at the same time has 1 go routine running on the assigned OS 
> Thread :
>
> SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
> idlethreads=0 runqueue=0 [1 1 1 1]
>
> Thanks.
>

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/golang-nuts/36efa087-d66c-4d7e-b5b2-de1d4d3ea339o%40googlegroups.com.

Reply via email to