This is an automated email from the ASF dual-hosted git repository.
mitchell852 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficcontrol.git
The following commit(s) were added to refs/heads/master by this push:
new b0dfe23 Make Traffic Stats fail over to other Traffic Monitors (#5983)
b0dfe23 is described below
commit b0dfe231a05376bec982eddd884de4165cb82ba3
Author: Rawlin Peters <[email protected]>
AuthorDate: Tue Jul 6 09:25:16 2021 -0600
Make Traffic Stats fail over to other Traffic Monitors (#5983)
* Make Traffic Stats retry another Traffic Monitor if one is down
* Cleanup, add changelog, test
---
CHANGELOG.md | 1 +
traffic_stats/traffic_stats.go | 171 +++++++++++++++++-------------------
traffic_stats/traffic_stats_test.go | 103 ++++++++++++++++++++++
3 files changed, 187 insertions(+), 88 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 33ba8ff..0ccb3dd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -64,6 +64,7 @@ The format is based on [Keep a
Changelog](http://keepachangelog.com/en/1.0.0/).
- [#5407](https://github.com/apache/trafficcontrol/issues/5407) - Make sure
that you cannot add two servers with identical content
- [#2881](https://github.com/apache/trafficcontrol/issues/2881) - Some API
endpoints have incorrect Content-Types
- [#5863](https://github.com/apache/trafficcontrol/issues/5863) - Traffic
Monitor logs warnings to `log_location_info` instead of `log_location_warning`
+- [#5492](https://github.com/apache/trafficcontrol/issues/5942) - Traffic
Stats does not failover to another Traffic Monitor when one stops responding
- [#5363](https://github.com/apache/trafficcontrol/issues/5363) - Postgresql
version changeable by env variable
- [#5405](https://github.com/apache/trafficcontrol/issues/5405) - Prevent
Tenant update from choosing child as new parent
- [#5384](https://github.com/apache/trafficcontrol/issues/5384) - New grids
will now properly remember the current page number.
diff --git a/traffic_stats/traffic_stats.go b/traffic_stats/traffic_stats.go
index 5088a19..e2e4707 100644
--- a/traffic_stats/traffic_stats.go
+++ b/traffic_stats/traffic_stats.go
@@ -37,8 +37,8 @@ import (
"github.com/apache/trafficcontrol/lib/go-tc"
"github.com/apache/trafficcontrol/lib/go-util"
-
client "github.com/apache/trafficcontrol/traffic_ops/v2-client"
+
log "github.com/cihub/seelog"
influx "github.com/influxdata/influxdb/client/v2"
)
@@ -47,15 +47,6 @@ const UserAgent = "traffic-stats"
const TrafficOpsRequestTimeout = time.Second * time.Duration(10)
const (
- // FATAL will exit after printing error
- FATAL = iota
- // ERROR will just keep going, print error
- ERROR
- // WARN will keep going and print a warning
- WARN
-)
-
-const (
defaultPollingInterval = 10
defaultDailySummaryPollingInterval = 60
defaultConfigInterval = 300
@@ -88,8 +79,8 @@ type StartupConfig struct {
// RunningConfig is used to store runtime configuration for Traffic Stats.
This includes information
// about caches, cachegroups, and health urls
type RunningConfig struct {
- HealthUrls map[string]map[string]string // the 1st map key is
CDN_name, the second is DsStats or CacheStats
- CacheMap map[string]tc.Server // map hostName to cache
+ HealthUrls map[string]map[string][]string // the 1st map key is
CDN_name, the second is DsStats or CacheStats
+ CacheMap map[string]tc.Server // map hostName to cache
LastSummaryTime time.Time
}
@@ -115,11 +106,17 @@ func main() {
configFile := flag.String("cfg", "", "The config file")
flag.Parse()
+ if *configFile == "" {
+ flag.Usage()
+ panic("-cfg is required")
+ }
config, err = loadStartupConfig(*configFile, config)
if err != nil {
- errHndlr(err, FATAL)
+ err = fmt.Errorf("could not load startup config: %v", err)
+ log.Error(err)
+ panic(err)
}
Bps = make(map[string]influx.BatchPoints)
@@ -145,7 +142,7 @@ func main() {
log.Info("HUP Received - reloading config")
newConfig, err := loadStartupConfig(*configFile, config)
if err != nil {
- errHndlr(err, ERROR)
+ log.Errorf("could not load startup config: %v",
err)
} else {
config = newConfig
tickers = setTimers(config)
@@ -153,12 +150,12 @@ func main() {
case <-termChan:
log.Info("Shutdown Request Received - Sending stored
metrics then quitting")
for _, val := range Bps {
- sendMetrics(config, runningConfig, val, false)
+ sendMetrics(config, val, false)
}
os.Exit(0)
case <-tickers.Publish:
for key, val := range Bps {
- go sendMetrics(config, runningConfig, val, true)
+ go sendMetrics(config, val, true)
delete(Bps, key)
}
case runningConfig = <-configChan:
@@ -166,9 +163,9 @@ func main() {
go getToData(config, false, configChan)
case <-tickers.Poll:
for cdnName, urls := range runningConfig.HealthUrls {
- for _, url := range urls {
- log.Debug(cdnName, " -> ", url)
- go calcMetrics(cdnName, url,
runningConfig.CacheMap, config, runningConfig)
+ for _, u := range urls {
+ log.Debug(cdnName, " -> ", u)
+ go calcMetrics(cdnName, u,
runningConfig.CacheMap, config)
}
}
case now := <-tickers.DailySummary:
@@ -247,9 +244,9 @@ func loadStartupConfig(configFile string, oldConfig
StartupConfig) (StartupConfi
if len(config.InfluxURLs) == 0 {
return config, fmt.Errorf("No InfluxDB urls provided in
influxUrls, please provide at least one valid URL. e.g. \"influxUrls\":
[\"http://localhost:8086\"]")
}
- for _, url := range config.InfluxURLs {
+ for _, u := range config.InfluxURLs {
influxDBProps := InfluxDBProps{
- URL: url,
+ URL: u,
}
config.InfluxDBs = append(config.InfluxDBs, &influxDBProps)
}
@@ -274,8 +271,7 @@ func calcDailySummary(now time.Time, config StartupConfig,
runningConfig Running
// influx connection
influxClient, err := influxConnect(config)
if err != nil {
- log.Error("Could not connect to InfluxDb to get daily
summary stats!!")
- errHndlr(err, ERROR)
+ log.Errorf("could not connect to InfluxDb to get daily
summary stats: %v", err)
return
}
@@ -426,7 +422,7 @@ func writeSummaryStats(config StartupConfig, statsSummary
tc.StatsSummary) {
}
_, _, err = to.CreateSummaryStats(statsSummary)
if err != nil {
- log.Error(err)
+ log.Errorf("could not create summary stats: %v", err)
}
}
@@ -480,14 +476,29 @@ func getToData(config StartupConfig, init bool,
configChan chan RunningConfig) {
cacheStatPath = strings.Replace(cacheStatPath, "=,", "=", 1)
dsStatPath = strings.Replace(dsStatPath, "=,", "=", 1)
- runningConfig.HealthUrls = make(map[string]map[string]string)
- for _, server := range servers {
- if server.Type == "RASCAL" && server.Status !=
config.StatusToMon {
+ setHealthURLs(config, &runningConfig, cacheStatPath, dsStatPath)
+
+ lastSummaryTimeResponse, _, err :=
to.GetSummaryStatsLastUpdated(util.StrPtr("daily_maxgbps"))
+ if err != nil {
+ log.Errorf("unable to get summary stats last updated: %v", err)
+ } else if lastSummaryTimeResponse.Response.SummaryTime == nil {
+ log.Warn("unable to get last updated stats summary timestamp:
daily_maxgbps stats summary not reported yet")
+ } else {
+ runningConfig.LastSummaryTime =
*lastSummaryTimeResponse.Response.SummaryTime
+ }
+
+ configChan <- runningConfig
+}
+
+func setHealthURLs(config StartupConfig, runningConfig *RunningConfig,
cacheStatPath string, dsStatPath string) {
+ runningConfig.HealthUrls = make(map[string]map[string][]string)
+ for _, server := range runningConfig.CacheMap {
+ if server.Type == tc.MonitorTypeName && server.Status !=
config.StatusToMon {
log.Debugf("Skipping %s.%s. Looking for status %s but
got status %s", server.HostName, server.DomainName, config.StatusToMon,
server.Status)
continue
}
- if server.Type == "RASCAL" && server.Status ==
config.StatusToMon {
+ if server.Type == tc.MonitorTypeName && server.Status ==
config.StatusToMon {
cdnName := server.CDNName
if cdnName == "" {
log.Error("Unable to find CDN name for " +
server.HostName + ".. skipping")
@@ -495,62 +506,53 @@ func getToData(config StartupConfig, init bool,
configChan chan RunningConfig) {
}
if runningConfig.HealthUrls[cdnName] == nil {
- runningConfig.HealthUrls[cdnName] =
make(map[string]string)
+ runningConfig.HealthUrls[cdnName] =
make(map[string][]string)
}
- url := "http://" + server.HostName + "." +
server.DomainName + ":" + strconv.Itoa(server.TCPPort) + cacheStatPath
- runningConfig.HealthUrls[cdnName]["CacheStats"] = url
- url = "http://" + server.HostName + "." +
server.DomainName + ":" + strconv.Itoa(server.TCPPort) + dsStatPath
- runningConfig.HealthUrls[cdnName]["DsStats"] = url
+ healthURL := "http://" + server.HostName + "." +
server.DomainName + ":" + strconv.Itoa(server.TCPPort) + cacheStatPath
+ runningConfig.HealthUrls[cdnName]["CacheStats"] =
append(runningConfig.HealthUrls[cdnName]["CacheStats"], healthURL)
+ healthURL = "http://" + server.HostName + "." +
server.DomainName + ":" + strconv.Itoa(server.TCPPort) + dsStatPath
+ runningConfig.HealthUrls[cdnName]["DsStats"] =
append(runningConfig.HealthUrls[cdnName]["DsStats"], healthURL)
}
}
-
- lastSummaryTimeResponse, _, err :=
to.GetSummaryStatsLastUpdated(util.StrPtr("daily_maxgbps"))
- if err != nil {
- errHndlr(err, ERROR)
- } else if lastSummaryTimeResponse.Response.SummaryTime == nil {
- errHndlr(errors.New("unable to get last updated stats summary
timestamp: daily_maxgbps stats summary not reported yet"), WARN)
- } else {
- runningConfig.LastSummaryTime =
*lastSummaryTimeResponse.Response.SummaryTime
- }
-
- configChan <- runningConfig
}
-func calcMetrics(cdnName string, url string, cacheMap map[string]tc.Server,
config StartupConfig, runningConfig RunningConfig) {
- sampleTime := int64(time.Now().Unix())
+func calcMetrics(cdnName string, urls []string, cacheMap map[string]tc.Server,
config StartupConfig) {
+ sampleTime := time.Now().Unix()
// get the data from trafficMonitor
- trafMonData, err := getURL(url)
- if err != nil {
- log.Error("Unable to connect to Traffic Monitor @ ", url, " -
skipping timeslot")
+ var trafMonData []byte
+ var err error
+ var healthURL string
+ for _, u := range urls {
+ trafMonData, err = getURL(u)
+ if err != nil {
+ log.Errorf("error getting %s stats URL %s: %v",
cdnName, u, err)
+ continue
+ }
+ healthURL = u
+ log.Infof("successfully got %s stats URL %s", cdnName, u)
+ break
+ }
+ if healthURL == "" {
+ log.Errorf("unable to get any %s stats URL - skipping
timeslot", cdnName)
return
}
- if strings.Contains(url, "CacheStats") {
+ if strings.Contains(healthURL, "CacheStats") {
err = calcCacheValues(trafMonData, cdnName, sampleTime,
cacheMap, config)
- errHndlr(err, ERROR)
- } else if strings.Contains(url, "DsStats") {
+ if err != nil {
+ log.Errorf("error calculating cache metric values for
CDN %s: %v", cdnName, err)
+ }
+ } else if strings.Contains(healthURL, "DsStats") {
err = calcDsValues(trafMonData, cdnName, sampleTime, config)
- errHndlr(err, ERROR)
- } else {
- log.Warn("Don't know what to do with ", url)
- }
-}
-
-func errHndlr(err error, severity int) {
- if err != nil {
- switch {
- case severity == WARN:
- log.Warn(err)
- case severity == ERROR:
- log.Error(err)
- case severity == FATAL:
- log.Error(err)
- panic(err)
+ if err != nil {
+ log.Errorf("error calculating delivery service metric
values for CDN %s: %v", cdnName, err)
}
+ } else {
+ log.Warn("Don't know what to do with given %s stats URL: ",
cdnName, healthURL)
}
}
-func calcDsValues(rascalData []byte, cdnName string, sampleTime int64, config
StartupConfig) error {
+func calcDsValues(tmData []byte, cdnName string, sampleTime int64, config
StartupConfig) error {
type DsStatsJSON struct {
Pp string `json:"pp"`
Date string `json:"date"`
@@ -563,7 +565,7 @@ func calcDsValues(rascalData []byte, cdnName string,
sampleTime int64, config St
}
var jData DsStatsJSON
- err := json.Unmarshal(rascalData, &jData)
+ err := json.Unmarshal(tmData, &jData)
if err != nil {
return fmt.Errorf("could not unmarshall deliveryservice stats
JSON - %v", err)
}
@@ -606,7 +608,7 @@ func calcDsValues(rascalData []byte, cdnName string,
sampleTime int64, config St
statTime := strconv.Itoa(dsMetricData[0].Time)
msInt, err := strconv.ParseInt(statTime, 10, 64)
if err != nil {
- errHndlr(err, ERROR)
+ log.Errorf("calculating delivery service metric
values: error parsing stat time: %v", err)
}
newTime := time.Unix(0, msInt*int64(time.Millisecond))
@@ -626,7 +628,7 @@ func calcDsValues(rascalData []byte, cdnName string,
sampleTime int64, config St
newTime,
)
if err != nil {
- errHndlr(err, ERROR)
+ log.Errorf("calculating delivery service metric
values: error creating new influxDB point: %v", err)
continue
}
bps.AddPoint(pt)
@@ -652,7 +654,7 @@ func calcCacheValues(trafmonData []byte, cdnName string,
sampleTime int64, cache
RetentionPolicy: config.CacheRetentionPolicy,
})
if err != nil {
- errHndlr(err, ERROR)
+ log.Errorf("calculating cache metric values: creating new
influxDB batch points: %v", err)
}
for cacheName, cacheData := range jData.Caches {
@@ -700,7 +702,7 @@ func calcCacheValues(trafmonData []byte, cdnName string,
sampleTime int64, cache
statData[0].Time,
)
if err != nil {
- errHndlr(err, ERROR)
+ log.Errorf("calculating cache metric values:
error creating new influxDB point: %v", err)
continue
}
bps.AddPoint(pt)
@@ -738,7 +740,7 @@ func influxConnect(config StartupConfig) (influx.Client,
error) {
}
con, err := influx.NewUDPClient(conf)
if err != nil {
- errHndlr(fmt.Errorf("An error occurred creating
udp client. %v\n", err), ERROR)
+ log.Errorf("An error occurred creating InfluxDB
UDP client: %v", err)
continue
}
return con, nil
@@ -751,7 +753,7 @@ func influxConnect(config StartupConfig) (influx.Client,
error) {
}
con, err := influx.NewHTTPClient(conf)
if err != nil {
- errHndlr(fmt.Errorf("An error occurred creating HTTP
client. %v\n", err), ERROR)
+ log.Errorf("An error occurred creating InfluxDB HTTP
client: %v", err)
continue
}
//Close old connections explicitly
@@ -761,7 +763,7 @@ func influxConnect(config StartupConfig) (influx.Client,
error) {
host.InfluxClient = con
_, _, err = con.Ping(10)
if err != nil {
- errHndlr(err, WARN)
+ log.Warnf("pinging InfluxDB: %v", err)
continue
}
return con, nil
@@ -770,13 +772,13 @@ func influxConnect(config StartupConfig) (influx.Client,
error) {
return nil, err
}
-func sendMetrics(config StartupConfig, runningConfig RunningConfig, bps
influx.BatchPoints, retry bool) {
+func sendMetrics(config StartupConfig, bps influx.BatchPoints, retry bool) {
influxClient, err := influxConnect(config)
if err != nil {
if retry {
config.BpsChan <- bps
}
- errHndlr(err, ERROR)
+ log.Errorf("sending metrics to InfluxDB: unable to get InfluxDB
client: %v", err)
return
}
@@ -791,7 +793,7 @@ func sendMetrics(config StartupConfig, runningConfig
RunningConfig, bps influx.B
if retry {
config.BpsChan <- chunkBps
}
- errHndlr(err, ERROR)
+ log.Errorf("sending metrics to InfluxDB: error creating
new batch points: %v", err)
}
for _, p := range pts[:intMin(config.MaxPublishSize, len(pts))]
{
chunkBps.AddPoint(p)
@@ -803,7 +805,7 @@ func sendMetrics(config StartupConfig, runningConfig
RunningConfig, bps influx.B
if retry {
config.BpsChan <- chunkBps
}
- errHndlr(err, ERROR)
+ log.Errorf("sending metrics to InfluxDB: error writing
batch points: %v", err)
} else {
log.Info(fmt.Sprintf("Sent %v stats for %v",
len(chunkBps.Points()), chunkBps.Database()))
}
@@ -816,10 +818,3 @@ func intMin(a, b int) int {
}
return b
}
-
-func floatMax(a, b float64) float64 {
- if a > b {
- return a
- }
- return b
-}
diff --git a/traffic_stats/traffic_stats_test.go
b/traffic_stats/traffic_stats_test.go
index 735023a..985fa76 100644
--- a/traffic_stats/traffic_stats_test.go
+++ b/traffic_stats/traffic_stats_test.go
@@ -21,6 +21,8 @@ under the License.
import (
"encoding/json"
+ "reflect"
+ "sort"
"testing"
"time"
@@ -155,3 +157,104 @@ func TestCalcCacheValues(t *testing.T) {
}
}
}
+
+func TestSetHealthURLs(t *testing.T) {
+ cfg := StartupConfig{
+ StatusToMon: tc.CacheStatusOffline.String(),
+ }
+ runningCfg := RunningConfig{}
+ runningCfg.CacheMap = map[string]tc.Server{
+ "tm1": {
+ CDNName: "foo",
+ DomainName: "example.org",
+ HostName: "tm1",
+ Status: tc.CacheStatusOffline.String(),
+ TCPPort: 8080,
+ Type: tc.MonitorTypeName,
+ },
+ "tm2": {
+ CDNName: "bar",
+ DomainName: "example.org",
+ HostName: "tm2",
+ Status: tc.CacheStatusOffline.String(),
+ TCPPort: 8080,
+ Type: tc.MonitorTypeName,
+ },
+ "tm3": {
+ CDNName: "foo",
+ DomainName: "example.org",
+ HostName: "tm3",
+ Status: tc.CacheStatusOffline.String(),
+ TCPPort: 8080,
+ Type: tc.MonitorTypeName,
+ },
+ "tm4": {
+ CDNName: "bar",
+ DomainName: "example.org",
+ HostName: "tm4",
+ Status: tc.CacheStatusOffline.String(),
+ TCPPort: 8080,
+ Type: tc.MonitorTypeName,
+ },
+ "tm5": {
+ CDNName: "foo",
+ DomainName: "example.org",
+ HostName: "tm5",
+ Status: tc.CacheStatusOnline.String(),
+ TCPPort: 8080,
+ Type: tc.MonitorTypeName,
+ },
+ "tm6": {
+ CDNName: "bar",
+ DomainName: "example.org",
+ HostName: "tm6",
+ Status: tc.CacheStatusOnline.String(),
+ TCPPort: 8080,
+ Type: tc.MonitorTypeName,
+ },
+ "cache1": {
+ CDNName: "foo",
+ DomainName: "example.org",
+ HostName: "cache1",
+ Status: tc.CacheStatusOffline.String(),
+ TCPPort: 8080,
+ Type: tc.CacheTypeEdge.String(),
+ },
+ }
+ setHealthURLs(cfg, &runningCfg, "/cacheStats", "/dsStats")
+ expected := map[string]map[string][]string{
+ "foo": {
+ "CacheStats": []string{
+ "http://tm1.example.org:8080/cacheStats",
+ "http://tm3.example.org:8080/cacheStats",
+ },
+ "DsStats": []string{
+ "http://tm1.example.org:8080/dsStats",
+ "http://tm3.example.org:8080/dsStats",
+ },
+ },
+ "bar": {
+ "CacheStats": []string{
+ "http://tm2.example.org:8080/cacheStats",
+ "http://tm4.example.org:8080/cacheStats",
+ },
+ "DsStats": []string{
+ "http://tm2.example.org:8080/dsStats",
+ "http://tm4.example.org:8080/dsStats",
+ },
+ },
+ }
+ for _, toBeSorted := range []map[string]map[string][]string{
+ expected,
+ runningCfg.HealthUrls,
+ } {
+ for _, healthURLs := range toBeSorted {
+ for _, urls := range healthURLs {
+ sort.Strings(urls)
+ }
+ }
+ }
+ if !reflect.DeepEqual(expected, runningCfg.HealthUrls) {
+ t.Errorf("expected: %+v, actual: %+v", expected,
runningCfg.HealthUrls)
+ }
+}