This is an automated email from the ASF dual-hosted git repository.

mitchell852 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficcontrol.git


The following commit(s) were added to refs/heads/master by this push:
     new b0dfe23  Make Traffic Stats fail over to other Traffic Monitors (#5983)
b0dfe23 is described below

commit b0dfe231a05376bec982eddd884de4165cb82ba3
Author: Rawlin Peters <[email protected]>
AuthorDate: Tue Jul 6 09:25:16 2021 -0600

    Make Traffic Stats fail over to other Traffic Monitors (#5983)
    
    * Make Traffic Stats retry another Traffic Monitor if one is down
    
    * Cleanup, add changelog, test
---
 CHANGELOG.md                        |   1 +
 traffic_stats/traffic_stats.go      | 171 +++++++++++++++++-------------------
 traffic_stats/traffic_stats_test.go | 103 ++++++++++++++++++++++
 3 files changed, 187 insertions(+), 88 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 33ba8ff..0ccb3dd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -64,6 +64,7 @@ The format is based on [Keep a 
Changelog](http://keepachangelog.com/en/1.0.0/).
 - [#5407](https://github.com/apache/trafficcontrol/issues/5407) - Make sure 
that you cannot add two servers with identical content
 - [#2881](https://github.com/apache/trafficcontrol/issues/2881) - Some API 
endpoints have incorrect Content-Types
 - [#5863](https://github.com/apache/trafficcontrol/issues/5863) - Traffic 
Monitor logs warnings to `log_location_info` instead of `log_location_warning`
+- [#5492](https://github.com/apache/trafficcontrol/issues/5942) - Traffic 
Stats does not failover to another Traffic Monitor when one stops responding
 - [#5363](https://github.com/apache/trafficcontrol/issues/5363) - Postgresql 
version changeable by env variable
 - [#5405](https://github.com/apache/trafficcontrol/issues/5405) - Prevent 
Tenant update from choosing child as new parent
 - [#5384](https://github.com/apache/trafficcontrol/issues/5384) - New grids 
will now properly remember the current page number.
diff --git a/traffic_stats/traffic_stats.go b/traffic_stats/traffic_stats.go
index 5088a19..e2e4707 100644
--- a/traffic_stats/traffic_stats.go
+++ b/traffic_stats/traffic_stats.go
@@ -37,8 +37,8 @@ import (
 
        "github.com/apache/trafficcontrol/lib/go-tc"
        "github.com/apache/trafficcontrol/lib/go-util"
-
        client "github.com/apache/trafficcontrol/traffic_ops/v2-client"
+
        log "github.com/cihub/seelog"
        influx "github.com/influxdata/influxdb/client/v2"
 )
@@ -47,15 +47,6 @@ const UserAgent = "traffic-stats"
 const TrafficOpsRequestTimeout = time.Second * time.Duration(10)
 
 const (
-       // FATAL will exit after printing error
-       FATAL = iota
-       // ERROR will just keep going, print error
-       ERROR
-       // WARN will keep going and print a warning
-       WARN
-)
-
-const (
        defaultPollingInterval             = 10
        defaultDailySummaryPollingInterval = 60
        defaultConfigInterval              = 300
@@ -88,8 +79,8 @@ type StartupConfig struct {
 // RunningConfig is used to store runtime configuration for Traffic Stats.  
This includes information
 // about caches, cachegroups, and health urls
 type RunningConfig struct {
-       HealthUrls      map[string]map[string]string // the 1st map key is 
CDN_name, the second is DsStats or CacheStats
-       CacheMap        map[string]tc.Server         // map hostName to cache
+       HealthUrls      map[string]map[string][]string // the 1st map key is 
CDN_name, the second is DsStats or CacheStats
+       CacheMap        map[string]tc.Server           // map hostName to cache
        LastSummaryTime time.Time
 }
 
@@ -115,11 +106,17 @@ func main() {
 
        configFile := flag.String("cfg", "", "The config file")
        flag.Parse()
+       if *configFile == "" {
+               flag.Usage()
+               panic("-cfg is required")
+       }
 
        config, err = loadStartupConfig(*configFile, config)
 
        if err != nil {
-               errHndlr(err, FATAL)
+               err = fmt.Errorf("could not load startup config: %v", err)
+               log.Error(err)
+               panic(err)
        }
 
        Bps = make(map[string]influx.BatchPoints)
@@ -145,7 +142,7 @@ func main() {
                        log.Info("HUP Received - reloading config")
                        newConfig, err := loadStartupConfig(*configFile, config)
                        if err != nil {
-                               errHndlr(err, ERROR)
+                               log.Errorf("could not load startup config: %v", 
err)
                        } else {
                                config = newConfig
                                tickers = setTimers(config)
@@ -153,12 +150,12 @@ func main() {
                case <-termChan:
                        log.Info("Shutdown Request Received - Sending stored 
metrics then quitting")
                        for _, val := range Bps {
-                               sendMetrics(config, runningConfig, val, false)
+                               sendMetrics(config, val, false)
                        }
                        os.Exit(0)
                case <-tickers.Publish:
                        for key, val := range Bps {
-                               go sendMetrics(config, runningConfig, val, true)
+                               go sendMetrics(config, val, true)
                                delete(Bps, key)
                        }
                case runningConfig = <-configChan:
@@ -166,9 +163,9 @@ func main() {
                        go getToData(config, false, configChan)
                case <-tickers.Poll:
                        for cdnName, urls := range runningConfig.HealthUrls {
-                               for _, url := range urls {
-                                       log.Debug(cdnName, " -> ", url)
-                                       go calcMetrics(cdnName, url, 
runningConfig.CacheMap, config, runningConfig)
+                               for _, u := range urls {
+                                       log.Debug(cdnName, " -> ", u)
+                                       go calcMetrics(cdnName, u, 
runningConfig.CacheMap, config)
                                }
                        }
                case now := <-tickers.DailySummary:
@@ -247,9 +244,9 @@ func loadStartupConfig(configFile string, oldConfig 
StartupConfig) (StartupConfi
        if len(config.InfluxURLs) == 0 {
                return config, fmt.Errorf("No InfluxDB urls provided in 
influxUrls, please provide at least one valid URL.  e.g. \"influxUrls\": 
[\"http://localhost:8086\"]";)
        }
-       for _, url := range config.InfluxURLs {
+       for _, u := range config.InfluxURLs {
                influxDBProps := InfluxDBProps{
-                       URL: url,
+                       URL: u,
                }
                config.InfluxDBs = append(config.InfluxDBs, &influxDBProps)
        }
@@ -274,8 +271,7 @@ func calcDailySummary(now time.Time, config StartupConfig, 
runningConfig Running
                // influx connection
                influxClient, err := influxConnect(config)
                if err != nil {
-                       log.Error("Could not connect to InfluxDb to get daily 
summary stats!!")
-                       errHndlr(err, ERROR)
+                       log.Errorf("could not connect to InfluxDb to get daily 
summary stats: %v", err)
                        return
                }
 
@@ -426,7 +422,7 @@ func writeSummaryStats(config StartupConfig, statsSummary 
tc.StatsSummary) {
        }
        _, _, err = to.CreateSummaryStats(statsSummary)
        if err != nil {
-               log.Error(err)
+               log.Errorf("could not create summary stats: %v", err)
        }
 }
 
@@ -480,14 +476,29 @@ func getToData(config StartupConfig, init bool, 
configChan chan RunningConfig) {
        cacheStatPath = strings.Replace(cacheStatPath, "=,", "=", 1)
        dsStatPath = strings.Replace(dsStatPath, "=,", "=", 1)
 
-       runningConfig.HealthUrls = make(map[string]map[string]string)
-       for _, server := range servers {
-               if server.Type == "RASCAL" && server.Status != 
config.StatusToMon {
+       setHealthURLs(config, &runningConfig, cacheStatPath, dsStatPath)
+
+       lastSummaryTimeResponse, _, err := 
to.GetSummaryStatsLastUpdated(util.StrPtr("daily_maxgbps"))
+       if err != nil {
+               log.Errorf("unable to get summary stats last updated: %v", err)
+       } else if lastSummaryTimeResponse.Response.SummaryTime == nil {
+               log.Warn("unable to get last updated stats summary timestamp: 
daily_maxgbps stats summary not reported yet")
+       } else {
+               runningConfig.LastSummaryTime = 
*lastSummaryTimeResponse.Response.SummaryTime
+       }
+
+       configChan <- runningConfig
+}
+
+func setHealthURLs(config StartupConfig, runningConfig *RunningConfig, 
cacheStatPath string, dsStatPath string) {
+       runningConfig.HealthUrls = make(map[string]map[string][]string)
+       for _, server := range runningConfig.CacheMap {
+               if server.Type == tc.MonitorTypeName && server.Status != 
config.StatusToMon {
                        log.Debugf("Skipping %s.%s.  Looking for status %s but 
got status %s", server.HostName, server.DomainName, config.StatusToMon, 
server.Status)
                        continue
                }
 
-               if server.Type == "RASCAL" && server.Status == 
config.StatusToMon {
+               if server.Type == tc.MonitorTypeName && server.Status == 
config.StatusToMon {
                        cdnName := server.CDNName
                        if cdnName == "" {
                                log.Error("Unable to find CDN name for " + 
server.HostName + ".. skipping")
@@ -495,62 +506,53 @@ func getToData(config StartupConfig, init bool, 
configChan chan RunningConfig) {
                        }
 
                        if runningConfig.HealthUrls[cdnName] == nil {
-                               runningConfig.HealthUrls[cdnName] = 
make(map[string]string)
+                               runningConfig.HealthUrls[cdnName] = 
make(map[string][]string)
                        }
-                       url := "http://"; + server.HostName + "." + 
server.DomainName + ":" + strconv.Itoa(server.TCPPort) + cacheStatPath
-                       runningConfig.HealthUrls[cdnName]["CacheStats"] = url
-                       url = "http://"; + server.HostName + "." + 
server.DomainName + ":" + strconv.Itoa(server.TCPPort) + dsStatPath
-                       runningConfig.HealthUrls[cdnName]["DsStats"] = url
+                       healthURL := "http://"; + server.HostName + "." + 
server.DomainName + ":" + strconv.Itoa(server.TCPPort) + cacheStatPath
+                       runningConfig.HealthUrls[cdnName]["CacheStats"] = 
append(runningConfig.HealthUrls[cdnName]["CacheStats"], healthURL)
+                       healthURL = "http://"; + server.HostName + "." + 
server.DomainName + ":" + strconv.Itoa(server.TCPPort) + dsStatPath
+                       runningConfig.HealthUrls[cdnName]["DsStats"] = 
append(runningConfig.HealthUrls[cdnName]["DsStats"], healthURL)
                }
        }
-
-       lastSummaryTimeResponse, _, err := 
to.GetSummaryStatsLastUpdated(util.StrPtr("daily_maxgbps"))
-       if err != nil {
-               errHndlr(err, ERROR)
-       } else if lastSummaryTimeResponse.Response.SummaryTime == nil {
-               errHndlr(errors.New("unable to get last updated stats summary 
timestamp: daily_maxgbps stats summary not reported yet"), WARN)
-       } else {
-               runningConfig.LastSummaryTime = 
*lastSummaryTimeResponse.Response.SummaryTime
-       }
-
-       configChan <- runningConfig
 }
 
-func calcMetrics(cdnName string, url string, cacheMap map[string]tc.Server, 
config StartupConfig, runningConfig RunningConfig) {
-       sampleTime := int64(time.Now().Unix())
+func calcMetrics(cdnName string, urls []string, cacheMap map[string]tc.Server, 
config StartupConfig) {
+       sampleTime := time.Now().Unix()
        // get the data from trafficMonitor
-       trafMonData, err := getURL(url)
-       if err != nil {
-               log.Error("Unable to connect to Traffic Monitor @ ", url, " - 
skipping timeslot")
+       var trafMonData []byte
+       var err error
+       var healthURL string
+       for _, u := range urls {
+               trafMonData, err = getURL(u)
+               if err != nil {
+                       log.Errorf("error getting %s stats URL %s: %v", 
cdnName, u, err)
+                       continue
+               }
+               healthURL = u
+               log.Infof("successfully got %s stats URL %s", cdnName, u)
+               break
+       }
+       if healthURL == "" {
+               log.Errorf("unable to get any %s stats URL - skipping 
timeslot", cdnName)
                return
        }
 
-       if strings.Contains(url, "CacheStats") {
+       if strings.Contains(healthURL, "CacheStats") {
                err = calcCacheValues(trafMonData, cdnName, sampleTime, 
cacheMap, config)
-               errHndlr(err, ERROR)
-       } else if strings.Contains(url, "DsStats") {
+               if err != nil {
+                       log.Errorf("error calculating cache metric values for 
CDN %s: %v", cdnName, err)
+               }
+       } else if strings.Contains(healthURL, "DsStats") {
                err = calcDsValues(trafMonData, cdnName, sampleTime, config)
-               errHndlr(err, ERROR)
-       } else {
-               log.Warn("Don't know what to do with ", url)
-       }
-}
-
-func errHndlr(err error, severity int) {
-       if err != nil {
-               switch {
-               case severity == WARN:
-                       log.Warn(err)
-               case severity == ERROR:
-                       log.Error(err)
-               case severity == FATAL:
-                       log.Error(err)
-                       panic(err)
+               if err != nil {
+                       log.Errorf("error calculating delivery service metric 
values for CDN %s: %v", cdnName, err)
                }
+       } else {
+               log.Warn("Don't know what to do with given %s stats URL: ", 
cdnName, healthURL)
        }
 }
 
-func calcDsValues(rascalData []byte, cdnName string, sampleTime int64, config 
StartupConfig) error {
+func calcDsValues(tmData []byte, cdnName string, sampleTime int64, config 
StartupConfig) error {
        type DsStatsJSON struct {
                Pp              string `json:"pp"`
                Date            string `json:"date"`
@@ -563,7 +565,7 @@ func calcDsValues(rascalData []byte, cdnName string, 
sampleTime int64, config St
        }
 
        var jData DsStatsJSON
-       err := json.Unmarshal(rascalData, &jData)
+       err := json.Unmarshal(tmData, &jData)
        if err != nil {
                return fmt.Errorf("could not unmarshall deliveryservice stats 
JSON - %v", err)
        }
@@ -606,7 +608,7 @@ func calcDsValues(rascalData []byte, cdnName string, 
sampleTime int64, config St
                        statTime := strconv.Itoa(dsMetricData[0].Time)
                        msInt, err := strconv.ParseInt(statTime, 10, 64)
                        if err != nil {
-                               errHndlr(err, ERROR)
+                               log.Errorf("calculating delivery service metric 
values: error parsing stat time: %v", err)
                        }
 
                        newTime := time.Unix(0, msInt*int64(time.Millisecond))
@@ -626,7 +628,7 @@ func calcDsValues(rascalData []byte, cdnName string, 
sampleTime int64, config St
                                newTime,
                        )
                        if err != nil {
-                               errHndlr(err, ERROR)
+                               log.Errorf("calculating delivery service metric 
values: error creating new influxDB point: %v", err)
                                continue
                        }
                        bps.AddPoint(pt)
@@ -652,7 +654,7 @@ func calcCacheValues(trafmonData []byte, cdnName string, 
sampleTime int64, cache
                RetentionPolicy: config.CacheRetentionPolicy,
        })
        if err != nil {
-               errHndlr(err, ERROR)
+               log.Errorf("calculating cache metric values: creating new 
influxDB batch points: %v", err)
        }
 
        for cacheName, cacheData := range jData.Caches {
@@ -700,7 +702,7 @@ func calcCacheValues(trafmonData []byte, cdnName string, 
sampleTime int64, cache
                                statData[0].Time,
                        )
                        if err != nil {
-                               errHndlr(err, ERROR)
+                               log.Errorf("calculating cache metric values: 
error creating new influxDB point: %v", err)
                                continue
                        }
                        bps.AddPoint(pt)
@@ -738,7 +740,7 @@ func influxConnect(config StartupConfig) (influx.Client, 
error) {
                        }
                        con, err := influx.NewUDPClient(conf)
                        if err != nil {
-                               errHndlr(fmt.Errorf("An error occurred creating 
udp client. %v\n", err), ERROR)
+                               log.Errorf("An error occurred creating InfluxDB 
UDP client: %v", err)
                                continue
                        }
                        return con, nil
@@ -751,7 +753,7 @@ func influxConnect(config StartupConfig) (influx.Client, 
error) {
                }
                con, err := influx.NewHTTPClient(conf)
                if err != nil {
-                       errHndlr(fmt.Errorf("An error occurred creating HTTP 
client.  %v\n", err), ERROR)
+                       log.Errorf("An error occurred creating InfluxDB HTTP 
client: %v", err)
                        continue
                }
                //Close old connections explicitly
@@ -761,7 +763,7 @@ func influxConnect(config StartupConfig) (influx.Client, 
error) {
                host.InfluxClient = con
                _, _, err = con.Ping(10)
                if err != nil {
-                       errHndlr(err, WARN)
+                       log.Warnf("pinging InfluxDB: %v", err)
                        continue
                }
                return con, nil
@@ -770,13 +772,13 @@ func influxConnect(config StartupConfig) (influx.Client, 
error) {
        return nil, err
 }
 
-func sendMetrics(config StartupConfig, runningConfig RunningConfig, bps 
influx.BatchPoints, retry bool) {
+func sendMetrics(config StartupConfig, bps influx.BatchPoints, retry bool) {
        influxClient, err := influxConnect(config)
        if err != nil {
                if retry {
                        config.BpsChan <- bps
                }
-               errHndlr(err, ERROR)
+               log.Errorf("sending metrics to InfluxDB: unable to get InfluxDB 
client: %v", err)
                return
        }
 
@@ -791,7 +793,7 @@ func sendMetrics(config StartupConfig, runningConfig 
RunningConfig, bps influx.B
                        if retry {
                                config.BpsChan <- chunkBps
                        }
-                       errHndlr(err, ERROR)
+                       log.Errorf("sending metrics to InfluxDB: error creating 
new batch points: %v", err)
                }
                for _, p := range pts[:intMin(config.MaxPublishSize, len(pts))] 
{
                        chunkBps.AddPoint(p)
@@ -803,7 +805,7 @@ func sendMetrics(config StartupConfig, runningConfig 
RunningConfig, bps influx.B
                        if retry {
                                config.BpsChan <- chunkBps
                        }
-                       errHndlr(err, ERROR)
+                       log.Errorf("sending metrics to InfluxDB: error writing 
batch points: %v", err)
                } else {
                        log.Info(fmt.Sprintf("Sent %v stats for %v", 
len(chunkBps.Points()), chunkBps.Database()))
                }
@@ -816,10 +818,3 @@ func intMin(a, b int) int {
        }
        return b
 }
-
-func floatMax(a, b float64) float64 {
-       if a > b {
-               return a
-       }
-       return b
-}
diff --git a/traffic_stats/traffic_stats_test.go 
b/traffic_stats/traffic_stats_test.go
index 735023a..985fa76 100644
--- a/traffic_stats/traffic_stats_test.go
+++ b/traffic_stats/traffic_stats_test.go
@@ -21,6 +21,8 @@ under the License.
 
 import (
        "encoding/json"
+       "reflect"
+       "sort"
        "testing"
        "time"
 
@@ -155,3 +157,104 @@ func TestCalcCacheValues(t *testing.T) {
                }
        }
 }
+
+func TestSetHealthURLs(t *testing.T) {
+       cfg := StartupConfig{
+               StatusToMon: tc.CacheStatusOffline.String(),
+       }
+       runningCfg := RunningConfig{}
+       runningCfg.CacheMap = map[string]tc.Server{
+               "tm1": {
+                       CDNName:    "foo",
+                       DomainName: "example.org",
+                       HostName:   "tm1",
+                       Status:     tc.CacheStatusOffline.String(),
+                       TCPPort:    8080,
+                       Type:       tc.MonitorTypeName,
+               },
+               "tm2": {
+                       CDNName:    "bar",
+                       DomainName: "example.org",
+                       HostName:   "tm2",
+                       Status:     tc.CacheStatusOffline.String(),
+                       TCPPort:    8080,
+                       Type:       tc.MonitorTypeName,
+               },
+               "tm3": {
+                       CDNName:    "foo",
+                       DomainName: "example.org",
+                       HostName:   "tm3",
+                       Status:     tc.CacheStatusOffline.String(),
+                       TCPPort:    8080,
+                       Type:       tc.MonitorTypeName,
+               },
+               "tm4": {
+                       CDNName:    "bar",
+                       DomainName: "example.org",
+                       HostName:   "tm4",
+                       Status:     tc.CacheStatusOffline.String(),
+                       TCPPort:    8080,
+                       Type:       tc.MonitorTypeName,
+               },
+               "tm5": {
+                       CDNName:    "foo",
+                       DomainName: "example.org",
+                       HostName:   "tm5",
+                       Status:     tc.CacheStatusOnline.String(),
+                       TCPPort:    8080,
+                       Type:       tc.MonitorTypeName,
+               },
+               "tm6": {
+                       CDNName:    "bar",
+                       DomainName: "example.org",
+                       HostName:   "tm6",
+                       Status:     tc.CacheStatusOnline.String(),
+                       TCPPort:    8080,
+                       Type:       tc.MonitorTypeName,
+               },
+               "cache1": {
+                       CDNName:    "foo",
+                       DomainName: "example.org",
+                       HostName:   "cache1",
+                       Status:     tc.CacheStatusOffline.String(),
+                       TCPPort:    8080,
+                       Type:       tc.CacheTypeEdge.String(),
+               },
+       }
+       setHealthURLs(cfg, &runningCfg, "/cacheStats", "/dsStats")
+       expected := map[string]map[string][]string{
+               "foo": {
+                       "CacheStats": []string{
+                               "http://tm1.example.org:8080/cacheStats";,
+                               "http://tm3.example.org:8080/cacheStats";,
+                       },
+                       "DsStats": []string{
+                               "http://tm1.example.org:8080/dsStats";,
+                               "http://tm3.example.org:8080/dsStats";,
+                       },
+               },
+               "bar": {
+                       "CacheStats": []string{
+                               "http://tm2.example.org:8080/cacheStats";,
+                               "http://tm4.example.org:8080/cacheStats";,
+                       },
+                       "DsStats": []string{
+                               "http://tm2.example.org:8080/dsStats";,
+                               "http://tm4.example.org:8080/dsStats";,
+                       },
+               },
+       }
+       for _, toBeSorted := range []map[string]map[string][]string{
+               expected,
+               runningCfg.HealthUrls,
+       } {
+               for _, healthURLs := range toBeSorted {
+                       for _, urls := range healthURLs {
+                               sort.Strings(urls)
+                       }
+               }
+       }
+       if !reflect.DeepEqual(expected, runningCfg.HealthUrls) {
+               t.Errorf("expected: %+v, actual: %+v", expected, 
runningCfg.HealthUrls)
+       }
+}

Reply via email to