This is an automated email from the ASF dual-hosted git repository.

mitchell852 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficcontrol.git


The following commit(s) were added to refs/heads/master by this push:
     new e1eb046  Treat io.EOF as an error and retry up to 10 times for files 
with that (#4978)
e1eb046 is described below

commit e1eb0462b25e2567e41ed29bac5d55de80bc16e6
Author: Zach Hoffman <[email protected]>
AuthorDate: Fri Aug 21 18:03:55 2020 +0000

    Treat io.EOF as an error and retry up to 10 times for files with that 
(#4978)
    
    error
---
 infrastructure/cdn-in-a-box/enroller/enroller.go | 57 +++++++++++++++++-------
 1 file changed, 40 insertions(+), 17 deletions(-)

diff --git a/infrastructure/cdn-in-a-box/enroller/enroller.go 
b/infrastructure/cdn-in-a-box/enroller/enroller.go
index f3a9cbd..0beecb6 100644
--- a/infrastructure/cdn-in-a-box/enroller/enroller.go
+++ b/infrastructure/cdn-in-a-box/enroller/enroller.go
@@ -27,6 +27,7 @@ import (
        "net/url"
        "os"
        "path/filepath"
+       "regexp"
        "strings"
        "time"
 
@@ -82,7 +83,7 @@ func enrollType(toSession *session, r io.Reader) error {
        dec := json.NewDecoder(r)
        var s tc.Type
        err := dec.Decode(&s)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding Type: %s\n", err)
                return err
        }
@@ -109,7 +110,7 @@ func enrollCDN(toSession *session, r io.Reader) error {
        dec := json.NewDecoder(r)
        var s tc.CDN
        err := dec.Decode(&s)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding CDN: %s\n", err)
                return err
        }
@@ -135,7 +136,7 @@ func enrollASN(toSession *session, r io.Reader) error {
        dec := json.NewDecoder(r)
        var s tc.ASN
        err := dec.Decode(&s)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding ASN: %s\n", err)
                return err
        }
@@ -162,7 +163,7 @@ func enrollCachegroup(toSession *session, r io.Reader) 
error {
        dec := json.NewDecoder(r)
        var s tc.CacheGroupNullable
        err := dec.Decode(&s)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding Cachegroup: %s\n", err)
                return err
        }
@@ -188,7 +189,7 @@ func enrollDeliveryService(toSession *session, r io.Reader) 
error {
        dec := json.NewDecoder(r)
        var s tc.DeliveryServiceNullable
        err := dec.Decode(&s)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding DeliveryService: %s\n", err)
                return err
        }
@@ -216,7 +217,7 @@ func enrollDeliveryServiceServer(toSession *session, r 
io.Reader) error {
        // DeliveryServiceServers lists ds xmlid and array of server names.  
Use that to create multiple DeliveryServiceServer objects
        var dss tc.DeliveryServiceServers
        err := dec.Decode(&dss)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding DeliveryServiceServer: %s\n", err)
                return err
        }
@@ -258,7 +259,7 @@ func enrollDivision(toSession *session, r io.Reader) error {
        dec := json.NewDecoder(r)
        var s tc.Division
        err := dec.Decode(&s)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding Division: %s\n", err)
                return err
        }
@@ -284,7 +285,7 @@ func enrollOrigin(toSession *session, r io.Reader) error {
        dec := json.NewDecoder(r)
        var s tc.Origin
        err := dec.Decode(&s)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding Origin: %s\n", err)
                return err
        }
@@ -310,7 +311,7 @@ func enrollParameter(toSession *session, r io.Reader) error 
{
        dec := json.NewDecoder(r)
        var params []tc.Parameter
        err := dec.Decode(&params)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding Parameter: %s\n", err)
                return err
        }
@@ -375,7 +376,7 @@ func enrollPhysLocation(toSession *session, r io.Reader) 
error {
        dec := json.NewDecoder(r)
        var s tc.PhysLocation
        err := dec.Decode(&s)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding PhysLocation: %s\n", err)
                return err
        }
@@ -401,7 +402,7 @@ func enrollRegion(toSession *session, r io.Reader) error {
        dec := json.NewDecoder(r)
        var s tc.Region
        err := dec.Decode(&s)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding Region: %s\n", err)
                return err
        }
@@ -427,7 +428,7 @@ func enrollStatus(toSession *session, r io.Reader) error {
        dec := json.NewDecoder(r)
        var s tc.StatusNullable
        err := dec.Decode(&s)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding Status: %s\n", err)
                return err
        }
@@ -453,7 +454,7 @@ func enrollTenant(toSession *session, r io.Reader) error {
        dec := json.NewDecoder(r)
        var s tc.Tenant
        err := dec.Decode(&s)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding Tenant: %s\n", err)
                return err
        }
@@ -480,7 +481,7 @@ func enrollUser(toSession *session, r io.Reader) error {
        var s tc.User
        err := dec.Decode(&s)
        log.Infof("User is %++v\n", s)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding User: %s\n", err)
                return err
        }
@@ -508,7 +509,7 @@ func enrollProfile(toSession *session, r io.Reader) error {
        var profile tc.Profile
 
        err := dec.Decode(&profile)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding Profile: %s\n", err)
                return err
        }
@@ -623,7 +624,7 @@ func enrollServer(toSession *session, r io.Reader) error {
        dec := json.NewDecoder(r)
        var s tc.ServerNullable
        err := dec.Decode(&s)
-       if err != nil && err != io.EOF {
+       if err != nil {
                log.Infof("error decoding Server: %s\n", err)
                return err
        }
@@ -659,7 +660,12 @@ func newDirWatcher(toSession *session) (*dirWatcher, 
error) {
                const (
                        processed = ".processed"
                        rejected  = ".rejected"
+                       retry  = ".retry"
                )
+               originalNameRegex := regexp.MustCompile(`(\.retry)*$`)
+
+               emptyCount := map[string]int{}
+               const maxEmptyTries = 10
 
                for {
                        select {
@@ -692,10 +698,27 @@ func newDirWatcher(toSession *session) (*dirWatcher, 
error) {
                                if f, ok := dw.watched[dir]; ok {
                                        t := filepath.Base(dir)
                                        log.Infoln("creating " + t + " from " + 
event.Name)
-                                       // TODO: ensure file content is there 
before attempting to read.  For now, this does the trick..
+                                       // Sleep for 100 milliseconds so that 
the file content is probably there when the directory watcher
+                                       // sees the file
                                        time.Sleep(100 * time.Millisecond)
 
                                        err := f(toSession, event.Name)
+                                       // If a file is empty, try reading from 
it 10 times before giving up on that file
+                                       if err == io.EOF {
+                                               originalName := 
originalNameRegex.ReplaceAllString(event.Name, "")
+                                               if _, exists := 
emptyCount[originalName]; !exists {
+                                                       
emptyCount[originalName] = 0
+                                               }
+                                               emptyCount[originalName]++
+                                               log.Infof("empty json object 
%s: %s\ntried file %d out of %d times", originalName, err.Error(), 
emptyCount[originalName], maxEmptyTries)
+                                               if emptyCount[originalName] < 
maxEmptyTries {
+                                                       newName := event.Name + 
retry
+                                                       if err := 
os.Rename(event.Name, newName); err != nil {
+                                                               
log.Infof("error renaming %s to %s: %s", event.Name, newName, err)
+                                                       }
+                                                       continue
+                                               }
+                                       }
                                        if err != nil {
                                                log.Infof("error creating %s 
from %s: %s\n", dir, event.Name, err.Error())
                                        } else {

Reply via email to