Tidy solution, removing the songch, doh https://play.golang.org/p/2VppDS0U27

On Saturday, October 29, 2016 at 8:42:02 PM UTC+1, Florian Weimer wrote:
>
> I'm trying to parse OpenPGP key server dumps.  Throughput is not too 
> bad so far, but I'd like to speed things up by introducing 
> parallelism. 
>
> The dumps are split into several files (each about 40 MB large, with a 
> few thousand keys).  The toy version I have so far does not need any 
> coordination between the processing of individual files 
>
> This works quite well when running the sequential version in parallel, 
> using GNU parallel.  With GOMAXPROCS=1, serial execution time is 
> reduced from 141 seconds to 31 seconds.  This is roughly what I would 
> expect from six cores with hyperthreading and use of GNU parallel 
> (which has some overhead of its own in this scenario). 
>
> However, the version with built-in parallelism runs in 55 seconds, so 
> only half as fast as the GNU parallel approach.  I would have expect 
> it to fare better compared to that.  The parallel version is my second 
> one which has decent performance.  I also tried a variant which has a 
> per-thread buffer which is occasionally written to standard output, 
> synchronized by a sync.Mutex.  (It would likely have benefited from a 
> sync.Mutex.TryLock() function, delaying the buffer flush if there was 
> contention.)  This was still significantly slower than the external 
> parallelization, but perhaps a little faster than parallel version 
> attached below.  I think both parallel approaches produce more garbage 
> than the sequential version, but likely not as much to explain the 
> speed difference compared to the sequential version with external 
> parallelization. 
>
> “perf top” suggests that a lot of time is spent in GC-related Go 
> functions (which is expected, considering what the program does).  But 
> vmstat shows a large number of context switches, which I find 
> surprising.  It is much higher than the number of context switching 
> during a GNU parallel run. 
>
> Most tests were run with the current master branch. 
>
> Is there anything else I could try to make the internally parallized 
> version as fast the externally parallized one? 
>
> package main 
>
> import ( 
>         "bytes" 
>         "bufio" 
>         "flag" 
>         "fmt" 
>         "io" 
>         openpgp "golang.org/x/crypto/openpgp/packet" 
>         "os" 
>         "reflect" 
>         "runtime" 
>         "sync" 
> ) 
>
> var verbose bool 
>
> var outputLock sync.Mutex 
>
> // Print the string to standard output, with optional synchronization. 
> type Output interface { 
>         Print(format string, args ...interface{}) 
> } 
>
> func packetError(path string, packet openpgp.OpaquePacket, err error) { 
>         outputLock.Lock() 
>         defer outputLock.Unlock() 
> } 
>
> func printPacketType(packet openpgp.Packet) { 
>         outputLock.Lock() 
>         defer outputLock.Unlock() 
> } 
>
> // A printer for OpenPGP user IDs which writes multiple key IDs en 
> // bloc, to avoid lock contention. 
> type uidPrinter struct { 
>         buffer bytes.Buffer 
>         count int 
> } 
>
> // Print one user ID.  The user ID might not be printed until Flush() 
> // is called. 
> func (p *uidPrinter) Print(uid *openpgp.UserId) { 
>         fmt.Fprintf(&p.buffer, "uid: %#v\n", uid.Id) 
>         p.count++ 
>         // Prevent the buffer from becoming too large. 
>         if p.count > 1000 { 
>                 p.Flush() 
>         } 
> } 
>
> // Print all the staged key IDs. 
> func (p *uidPrinter) Flush() { 
>         outputLock.Lock() 
>         defer outputLock.Unlock() 
>         os.Stdout.Write(p.buffer.Bytes()) 
>         p.buffer.Truncate(0) 
>         p.count = 0 
> } 
>
> func printError(format string, args ...interface{}) { 
>         outputLock.Lock() 
>         defer outputLock.Unlock() 
>         fmt.Fprintf(os.Stderr, format, args...) 
> } 
>
> func readFile(path string, output Output) error { 
>         file, err := os.Open(path) 
>         if err != nil { 
>                 return err 
>         } 
>         defer file.Close() 
>         buf := bufio.NewReader(file) 
>         packets := openpgp.NewOpaqueReader(buf) 
>         for { 
>                 op, err := packets.Next() 
>                 if err != nil { 
>                         if (err == io.EOF) { 
>                                 break 
>                         } 
>                         return err 
>                 } 
>                 p, err := op.Parse() 
>                 if err != nil { 
>                         continue 
>                 } 
>                 if verbose { 
>                         output.Print("%s\n", reflect.TypeOf(p).String()) 
>                 } 
>                 if uid, ok := p.(*openpgp.UserId); ok { 
>                         output.Print("uid: %#v\n", uid.Id) 
>                 } 
>         } 
>         return nil 
> } 
>
> // Buffered output to standard output, without synchronization. 
> type sequentialOutput struct { 
>         buffer *bufio.Writer 
> } 
>
> func newSequentialOutput() *sequentialOutput { 
>         return &sequentialOutput{bufio.NewWriter(os.Stdout)} 
> } 
>
> func (p *sequentialOutput) Print(format string, args ...interface{}) { 
>         fmt.Fprintf(p.buffer, format, args...) 
> } 
>
> func (p *sequentialOutput) Stop() { 
>         p.buffer.Flush() 
> } 
>
> func processSequential(files []string) { 
>         output := newSequentialOutput() 
>         for _, path := range files { 
>                 err := readFile(path, output) 
>                 if err != nil { 
>                         printError("%s: error: %s\n", path, err.Error()) 
>                 } 
>         } 
>         output.Stop() 
> } 
>
> // Return a channel a closed which contains the given path strings. 
> func channelOfPaths(paths []string) chan string { 
>         ch := make(chan string, len(paths)) 
>         for _, path := range paths { 
>                 ch <- path 
>         } 
>         close(ch) 
>         return ch 
> } 
>
> // Spawned as a goroutine to process files from a channel. 
> func processFromChannel(filesChannel chan string, ackChannel chan 
> struct{}, 
>         output Output) { 
>         for path := range filesChannel { 
>                 err := readFile(path, output) 
>                 if err != nil { 
>                         printError("%s: error: %s\n", path, err.Error()) 
>                 } 
>                 ackChannel <- struct{}{} 
>         } 
> } 
>
> // Parallel output to standard output, with synchronization to prevent 
> // interleaving. 
> type parallelOutput struct { 
>         dataChannel chan string        // Strings to print, "" means 
> termiantion. 
>         ackChannel chan struct{} // Signals completed termination request. 
> } 
>
> func newParallelOutput() *parallelOutput { 
>         p := &parallelOutput{ 
>                 dataChannel: make(chan string, 5000), 
>                 ackChannel: make(chan struct{})} 
>         go parallelOutputGoroutine(p) 
>         return p 
> } 
>
>
> func parallelOutputGoroutine(p *parallelOutput) { 
>         var buf = bufio.NewWriter(os.Stdout) 
>         for s := range p.dataChannel { 
>                 if s == "" { 
>                         break 
>                 } 
>                 buf.WriteString(s) 
>         } 
>         buf.Flush() 
>         // Tell Stop() that processing is complete. 
>         close(p.ackChannel) 
> } 
>
> func (p *parallelOutput) Print(format string, args ...interface{}) { 
>         s := fmt.Sprintf(format, args...) 
>         if s != "" { 
>                 p.dataChannel <- s 
>         } 
> } 
>
> func (p *parallelOutput) Stop() { 
>         // Request termination. 
>         p.dataChannel <- "" 
>         // Wait for completion of all pending requests. 
>         _, _ = <- p.ackChannel 
> } 
>
> // Number of parallel threads, controlled by the -threads flag. 
> var threads int 
>
> // Process the files in parallel. 
> func processParallel(files []string) { 
>         if len(files) == 0 { 
>                 return 
>         } 
>
>         filesChannel := channelOfPaths(files) 
>
>         // This channel is used to detect termination. 
>         ackChannel := make(chan struct{}) 
>
>         fmt.Fprintf(os.Stderr, "info: threads: %d\n", threads) 
>
>         output := newParallelOutput() 
>         for i := 0; i < threads; i++  { 
>                 go processFromChannel(filesChannel, ackChannel, output) 
>         } 
>
>         // Wait for termination of the conversion. 
>         acks := 0 
>         for _ = range ackChannel { 
>                 acks++ 
>                 if acks == len(files) { 
>                         break 
>                 } 
>         } 
>
>         // Wait until all pending entries have been printed. 
>         output.Stop() 
> } 
>
> func main() { 
>         var sequential bool 
>         flag.BoolVar(&verbose, "verbose", false, "more verbose output") 
>         flag.BoolVar(&sequential, "sequential", false, "disable 
> concurrency") 
>         flag.IntVar(&threads, "threads", runtime.GOMAXPROCS(0), 
>                 "number of processing threads") 
>         flag.Parse() 
>          
>         files := flag.Args() 
>         if (sequential) { 
>                 processSequential(files) 
>         } else { 
>                 processParallel(files) 
>         } 
> } 
>

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to