This is an automated email from the ASF dual-hosted git repository.
mattjackson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficcontrol.git
The following commit(s) were added to refs/heads/master by this push:
new f2a6b5b Add multiple interface support to TM
CacheStats/cache-statuses (#4690)
f2a6b5b is described below
commit f2a6b5ba360f181450bc85a1c7a5d58832361037
Author: Steve Hamrick <[email protected]>
AuthorDate: Thu Jun 18 09:03:48 2020 -0600
Add multiple interface support to TM CacheStats/cache-statuses (#4690)
* Handle cache-statuses correctly
* Update docs and fix issue with type
* Fix issue with status
* Fix typo
* fix issue
* Woops
* Dont calculate aggregate every loop, move localState code out of loop
* Update changelog
---
CHANGELOG.md | 1 +
.../traffic_monitor/traffic_monitor_api.rst | 149 +++++++++++++++--
lib/go-tc/enum.go | 3 +
traffic_monitor/cache/cache.go | 17 +-
traffic_monitor/cache/data.go | 16 +-
traffic_monitor/cache/data_test.go | 42 ++---
traffic_monitor/datareq/cachestate.go | 135 ++++++++++------
traffic_monitor/datareq/statsummary.go | 9 +-
traffic_monitor/health/cache.go | 177 +++++++++++++++------
traffic_monitor/health/cache_test.go | 45 ++++--
traffic_monitor/threadsafe/cacheavailablestatus.go | 2 +-
traffic_monitor/threadsafe/resultstathistory.go | 153 +++++++++++-------
.../threadsafe/resultstathistory_test.go | 3 +-
.../traffic_ops_golang/deliveryservice/capacity.go | 16 +-
14 files changed, 550 insertions(+), 218 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 811a165..eca4ac1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -39,6 +39,7 @@ The format is based on [Keep a
Changelog](http://keepachangelog.com/en/1.0.0/).
- Changed some Traffic Ops Go Client methods to use `DeliveryServiceNullable`
inputs and outputs.
- Changed Traffic Portal to use Traffic Ops API v3
- Changed ORT Config Generation to be deterministic, which will prevent
spurious diffs when nothing actually changed.
+- Changed the `/publish/CacheStats` in Traffic Monitor to support multiple
interfaces.
- Changed the access logs in Traffic Ops to now show the route ID with every
API endpoint call. The Route ID is appended to the end of the access log line.
- With the addition of multiple server interfaces, interface data is
constructed from IP Address/Gateway/Netmask (and their IPv6 counterparts) and
Interface Name and Interface MTU fields on services. These **MUST** have
proper, valid data before attempting to upgrade or the upgrade **WILL** fail.
In particular IP fields need to be valid IP addresses/netmasks, and MTU must
only be positive integers of at least 1280.
- The `/servers` and `/servers/{{ID}}}` API endpoints have been updated to use
and reflect multi-interface servers.
diff --git a/docs/source/development/traffic_monitor/traffic_monitor_api.rst
b/docs/source/development/traffic_monitor/traffic_monitor_api.rst
index 46a1067..14a2316 100644
--- a/docs/source/development/traffic_monitor/traffic_monitor_api.rst
+++ b/docs/source/development/traffic_monitor/traffic_monitor_api.rst
@@ -82,6 +82,13 @@ Request Structure
| | | treated as partial strings.
|
+--------------+---------+------------------------------------------------+
+.. code-block:: http
+ :caption: Example Request
+
+ GET /publish/CacheStats HTTP/1.1
+ Accept: */*
+ Content-Type: application/json
+
Response Structure
""""""""""""""""""
:pp: Stores any provided request parameters provided as a string
@@ -90,16 +97,86 @@ Response Structure
:<server name>: Each server's object is a collection of keys that are
the names of statistics
- :<statistic name>: The name of the statistic which this array
represents. Each value in the array is one (and usually only one) object with
the following structure:
+ :<interface name>: The name of the network interface under the
same sever
- :value: The statistic's value. This is *always* a
string, even if that string only contains a number.
- :time: An integer UNIX timestamp indicating the start
time for this value of this statistic
- :span: The span of time - in milliseconds - for which
this value is valid. This is determined by the polling interval for the
statistic
+ :<statistic name>: The name of the statistic which this
array represents. Each value in the array is one (and usually only one) object
with the following structure:
-.. code-block:: json
+ :value: The statistic's value. This is *always*
a string, even if that string only contains a number.
+ :time: An integer UNIX timestamp indicating the
start time for this value of this statistic
+ :span: The span of time - in milliseconds - for
which this value is valid. This is determined by the polling interval for the
statistic
+
+.. code-block:: http
:caption: Example Response
- {}
+ HTTP/1.1 200 OK
+ Content-Type: application/json
+ Date: Thu, 14 May 2020 15:48:55 GMT
+ Transfer-Encoding: chunked
+
+ {
+ "pp": "",
+ "date": "Thu May 14 15:48:55 UTC 2020",
+ "caches": {
+ "mid": {
+ "eth0": {
+
"ats.proxy.process.ssl.cipher.user_agent.PSK-AES256-GCM-SHA384": [
+ {
+ "value": "0",
+ "time": 1589471325624,
+ "span": 99
+ }
+ ]
+ },
+ "aggregate": {
+
"ats.proxy.process.http.milestone.server_begin_write": [
+ {
+ "value": "174",
+ "time": 1589471325624,
+ "span": 1
+ }
+ ]
+ },
+ "lo": {
+
"ats.proxy.node.http.transaction_counts_avg_10s.miss_changed": [
+ {
+ "value": "0",
+ "time": 1589471325624,
+ "span": 99
+ }
+ ]
+ }
+ },
+ "edge": {
+ "eth0": {
+
"ats.proxy.process.ssl.cipher.user_agent.PSK-AES256-GCM-SHA384": [
+ {
+ "value": "0",
+ "time": 1589471325624,
+ "span": 99
+ }
+ ]
+ },
+ "aggregate": {
+
"ats.proxy.process.http.milestone.server_begin_write": [
+ {
+ "value": "174",
+ "time": 1589471325624,
+ "span": 1
+ }
+ ]
+ },
+ "lo": {
+
"ats.proxy.node.http.transaction_counts_avg_10s.miss_changed": [
+ {
+ "value": "0",
+ "time": 1589471325624,
+ "span": 99
+ }
+ ]
+ }
+ }
+ }
+ }
``publish/CacheStats/{{cache}}``
================================
@@ -132,6 +209,13 @@ Request Structure
| | | treated as partial strings.
|
+--------------+---------+------------------------------------------------+
+.. code-block:: http
+ :caption: Example Request
+
+ GET /api/CacheStats/mid HTTP/1.1
+ Accept: */*
+ Content-Type: application/json
+
Response Structure
""""""""""""""""""
:pp: Stores any provided request parameters provided as a string
@@ -140,16 +224,57 @@ Response Structure
:<server name>: The requested server's object is a collection of keys
that are the names of statistics
- :<statistic name>: The name of the statistic which this array
represents. Each value in the array is one (and usually only one) object with
the following structure:
+ :<interface name>: The name of the network interface under the
same sever
- :value: The statistic's value. This is *always* a
string, even if that string only contains a number.
- :time: An integer UNIX timestamp indicating the start
time for this value of this statistic
- :span: The span of time - in milliseconds - for which
this value is valid. This is determined by the polling interval for the
statistic
+ :<statistic name>: The name of the statistic which this
array represents. Each value in the array is one (and usually only one) object
with the following structure:
-.. code-block:: json
+ :value: The statistic's value. This is *always*
a string, even if that string only contains a number.
+ :time: An integer UNIX timestamp indicating the
start time for this value of this statistic
+ :span: The span of time - in milliseconds - for
which this value is valid. This is determined by the polling interval for the
statistic
+
+.. code-block:: http
:caption: Example Response
- {}
+ HTTP/1.1 200 OK
+ Content-Type: application/json
+ Date: Thu, 14 May 2020 15:54:35 GMT
+ Transfer-Encoding: chunked
+
+ {
+ "pp": "",
+ "date": "Thu May 14 15:48:55 UTC 2020",
+ "caches": {
+ "mid": {
+ "eth0": {
+
"ats.proxy.process.ssl.cipher.user_agent.PSK-AES256-GCM-SHA384": [
+ {
+ "value": "0",
+ "time": 1589471325624,
+ "span": 99
+ }
+ ]
+ },
+ "aggregate": {
+
"ats.proxy.process.http.milestone.server_begin_write": [
+ {
+ "value": "174",
+ "time": 1589471325624,
+ "span": 1
+ }
+ ]
+ },
+ "lo": {
+
"ats.proxy.node.http.transaction_counts_avg_10s.miss_changed": [
+ {
+ "value": "0",
+ "time": 1589471325624,
+ "span": 99
+ }
+ ]
+ }
+ }
+ }
+ }
``/publish/DsStats``
====================
diff --git a/lib/go-tc/enum.go b/lib/go-tc/enum.go
index 0cb8e44..f9614e7 100644
--- a/lib/go-tc/enum.go
+++ b/lib/go-tc/enum.go
@@ -176,6 +176,9 @@ const DSProtocolHTTPS = 1
const DSProtocolHTTPAndHTTPS = 2
const DSProtocolHTTPToHTTPS = 3
+// CacheInterfacesAggregate represent the interface that stores aggregate data
for each interface on the cache
+const CacheInterfacesAggregate = "aggregate"
+
// CacheStatus represents the Traffic Server status set in Traffic Ops
(online, offline, admin_down, reported). The string values of this type should
match the Traffic Ops values.
type CacheStatus string
diff --git a/traffic_monitor/cache/cache.go b/traffic_monitor/cache/cache.go
index 0f9ddcd..a348970 100644
--- a/traffic_monitor/cache/cache.go
+++ b/traffic_monitor/cache/cache.go
@@ -129,6 +129,20 @@ func (result *Result) HasStat(stat string) bool {
return false
}
+// Returns the names of all interfaces including the aggregate
+func (result *Result) InterfacesNames() []string {
+ interfaceNames := []string{tc.CacheInterfacesAggregate}
+ for name, _ := range result.Statistics.Interfaces {
+ interfaceNames = append(interfaceNames, name)
+ }
+ return interfaceNames
+}
+
+// Returns the interfaces assigned to this result
+func (result *Result) Interfaces() map[string]Interface {
+ return result.Statistics.Interfaces
+}
+
// Vitals is the vitals data returned from a cache.
type Vitals struct {
// LoadAvg is the one-minute "loadavg" of the cache server.
@@ -148,7 +162,7 @@ type Stat struct {
// Stats is designed for returning via the API. It contains result history for
each cache, as well as common API data.
type Stats struct {
srvhttp.CommonAPIData
- Caches map[tc.CacheName]map[string][]ResultStatVal `json:"caches"`
+ Caches map[tc.CacheName]map[string]map[string][]ResultStatVal
`json:"caches"`
}
// Filter filters whether stats and caches should be returned from a data set.
@@ -168,7 +182,6 @@ func ComputedStats() map[string]StatComputeFunc {
"availableBandwidthInKbps": func(info ResultInfo, serverInfo
tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable)
interface{} {
return info.Vitals.MaxKbpsOut - info.Vitals.KbpsOut
},
-
"availableBandwidthInMbps": func(info ResultInfo, serverInfo
tc.TrafficServer, serverProfile tc.TMProfile, combinedState tc.IsAvailable)
interface{} {
return (info.Vitals.MaxKbpsOut - info.Vitals.KbpsOut) /
1000
},
diff --git a/traffic_monitor/cache/data.go b/traffic_monitor/cache/data.go
index f307dce..84a6b08 100644
--- a/traffic_monitor/cache/data.go
+++ b/traffic_monitor/cache/data.go
@@ -25,7 +25,7 @@ import (
"github.com/apache/trafficcontrol/lib/go-tc"
- "github.com/json-iterator/go"
+ jsoniter "github.com/json-iterator/go"
)
// AvailableStatusReported is the status string returned by caches set to
@@ -61,7 +61,7 @@ type AvailableStatus struct {
// The name of the actual status the cache server has, as configured in
// Traffic Ops.
Status string
- // Why will contain the reason a cache server has been purposely marked
+ // `Why` will contain the reason a cache server has been purposely
marked
// unavailable by a Traffic Ops operator, if indeed that has occurred.
Why string
// UnavailableStat is the stat whose threshold made the cache
unavailable.
@@ -75,14 +75,18 @@ type AvailableStatus struct {
}
// CacheAvailableStatuses is the available status of each cache.
-type AvailableStatuses map[tc.CacheName]AvailableStatus
+type AvailableStatuses map[tc.CacheName]map[string]AvailableStatus
// Copy copies this CacheAvailableStatuses. It does not modify, and thus is
// safe for multiple reader goroutines.
func (a AvailableStatuses) Copy() AvailableStatuses {
- b := AvailableStatuses(map[tc.CacheName]AvailableStatus{})
- for k, v := range a {
- b[k] = v
+ b := AvailableStatuses(map[tc.CacheName]map[string]AvailableStatus{})
+ for cacheName, interfaces := range a {
+ interfaceStats := make(map[string]AvailableStatus)
+ for interfaceName, status := range interfaces {
+ interfaceStats[interfaceName] = status
+ }
+ b[cacheName] = interfaceStats
}
return b
}
diff --git a/traffic_monitor/cache/data_test.go
b/traffic_monitor/cache/data_test.go
index 4c4c54f..9f6f47a 100644
--- a/traffic_monitor/cache/data_test.go
+++ b/traffic_monitor/cache/data_test.go
@@ -49,29 +49,13 @@ func randAvailableStatuses() AvailableStatuses {
a := AvailableStatuses{}
num := 100
for i := 0; i < num; i++ {
- a[tc.CacheName(randStr())] = AvailableStatus{Available:
AvailableTuple{randBool(), randBool()}, Status: randStr()}
+ cacheName := tc.CacheName(randStr())
+ a[cacheName] = make(map[string]AvailableStatus)
+ a[cacheName][randStr()] = AvailableStatus{Available:
AvailableTuple{randBool(), randBool()}, Status: randStr()}
}
return a
}
-func TestAvailableStatusesCopy(t *testing.T) {
- num := 100
- for i := 0; i < num; i++ {
- a := randAvailableStatuses()
- b := a.Copy()
-
- if !reflect.DeepEqual(a, b) {
- t.Errorf("expected a and b DeepEqual, actual copied map
not equal: a: %v b: %v", a, b)
- }
-
- // verify a and b don't point to the same map
- a[tc.CacheName(randStr())] = AvailableStatus{Available:
AvailableTuple{randBool(), randBool()}, Status: randStr()}
- if reflect.DeepEqual(a, b) {
- t.Errorf("expected a != b, actual a and b point to the
same map: a: %+v", a)
- }
- }
-}
-
func randStrIfaceMap() map[string]interface{} {
m := map[string]interface{}{}
num := 5
@@ -248,3 +232,23 @@ func TestResultHistoryCopy(t *testing.T) {
}
}
}
+
+func TestAvailableStatusesCopy(t *testing.T) {
+ num := 100
+ for i := 0; i < num; i++ {
+ a := randAvailableStatuses()
+ b := a.Copy()
+
+ if !reflect.DeepEqual(a, b) {
+ t.Errorf("expected a and b DeepEqual, actual copied map
not equal: a: %v b: %v", a, b)
+ }
+
+ cacheName := tc.CacheName(randStr())
+ a[cacheName] = make(map[string]AvailableStatus)
+ // verify a and b don't point to the same map
+ a[cacheName][randStr()] = AvailableStatus{Available:
AvailableTuple{randBool(), randBool()}, Status: randStr()}
+ if reflect.DeepEqual(a, b) {
+ t.Errorf("expected a != b, actual a and b point to the
same map: a: %+v", a)
+ }
+ }
+}
diff --git a/traffic_monitor/datareq/cachestate.go
b/traffic_monitor/datareq/cachestate.go
index 758f04a..8ceb036 100644
--- a/traffic_monitor/datareq/cachestate.go
+++ b/traffic_monitor/datareq/cachestate.go
@@ -32,16 +32,14 @@ import (
"github.com/apache/trafficcontrol/traffic_monitor/threadsafe"
"github.com/apache/trafficcontrol/traffic_monitor/todata"
- "github.com/json-iterator/go"
+ jsoniter "github.com/json-iterator/go"
)
// CacheStatus contains summary stat data about the given cache.
// TODO make fields nullable, so error fields can be omitted, letting API
callers still get updates for unerrored fields
type CacheStatus struct {
- Type *string `json:"type,omitempty"`
- Status *string `json:"status,omitempty"`
- StatusPoller *string `json:"status_poller,omitempty"`
- LoadAverage *float64 `json:"load_average,omitempty"`
+ Type *string `json:"type,omitempty"`
+ LoadAverage *float64 `json:"load_average,omitempty"`
// QueryTimeMilliseconds is the time it took this app to perform a
complete query and process the data, end-to-end, for the latest health query.
QueryTimeMilliseconds *int64 `json:"query_time_ms,omitempty"`
// HealthTimeMilliseconds is the time it took to make the HTTP request
and get back the full response, for the latest health query.
@@ -51,13 +49,29 @@ type CacheStatus struct {
// StatSpanMilliseconds is the length of time between completing the
most recent two stat queries. This can be used as a rough gauge of the
end-to-end query processing time.
StatSpanMilliseconds *int64 `json:"stat_span_ms,omitempty"`
// HealthSpanMilliseconds is the length of time between completing the
most recent two health queries. This can be used as a rough gauge of the
end-to-end query processing time.
- HealthSpanMilliseconds *int64 `json:"health_span_ms,omitempty"`
- BandwidthKbps *float64 `json:"bandwidth_kbps,omitempty"`
- BandwidthCapacityKbps *float64
`json:"bandwidth_capacity_kbps,omitempty"`
- ConnectionCount *int64 `json:"connection_count,omitempty"`
- IPv4Available *bool `json:"ipv4_available,omitempty"`
- IPv6Available *bool `json:"ipv6_available,omitempty"`
- CombinedAvailable *bool `json:"combined_available,omitempty"`
+ HealthSpanMilliseconds *int64 `json:"health_span_ms,omitempty"`
+
+ Status *string `json:"status,omitempty"`
+ StatusPoller *string `json:"status_poller,omitempty"`
+ BandwidthKbps *float64 `json:"bandwidth_kbps,omitempty"`
+ BandwidthCapacityKbps *float64
`json:"bandwidth_capacity_kbps,omitempty"`
+ ConnectionCount *int64 `json:"connection_count,omitempty"`
+ IPv4Available *bool `json:"ipv4_available,omitempty"`
+ IPv6Available *bool `json:"ipv6_available,omitempty"`
+ CombinedAvailable *bool `json:"combined_available,omitempty"`
+
+ Interfaces *map[string]CacheInterfaceStatus
`json:"interfaces,omitempty"`
+}
+
+type CacheInterfaceStatus struct {
+ Status *string `json:"status,omitempty"`
+ StatusPoller *string `json:"status_poller,omitempty"`
+ BandwidthKbps *float64 `json:"bandwidth_kbps,omitempty"`
+ BandwidthCapacityKbps *float64
`json:"bandwidth_capacity_kbps,omitempty"`
+ ConnectionCount *int64 `json:"connection_count,omitempty"`
+ IPv4Available *bool `json:"ipv4_available,omitempty"`
+ IPv6Available *bool `json:"ipv6_available,omitempty"`
+ CombinedAvailable *bool `json:"combined_available,omitempty"`
}
func srvAPICacheStates(
@@ -95,7 +109,51 @@ func createCacheStatuses(
for cacheNameStr, serverInfo := range servers {
cacheName := tc.CacheName(cacheNameStr)
- status, statusPoller, ipv4, ipv6, combinedStatus :=
cacheStatusAndPoller(cacheName, serverInfo, localCacheStatus)
+ interfaceStatus := make(map[string]CacheInterfaceStatus)
+
+ totalKbps := float64(0)
+ totalMaxKbps := float64(0)
+ totalConnections := int64(0)
+ for interfaceName, _ := range localCacheStatus[cacheName] {
+ if interfaceName == tc.CacheInterfacesAggregate {
+ continue
+ }
+ status, statusPoller, ipv4, ipv6, combinedStatus :=
cacheStatusAndPoller(cacheName, interfaceName, serverInfo, localCacheStatus)
+ var kbps float64
+ if lastStat, ok := lastStats.Caches[cacheName]; !ok {
+ log.Infof("cache not in last kbps cache %s\n",
cacheName)
+ } else {
+ kbps = lastStat.Bytes.PerSec /
float64(ds.BytesPerKilobit)
+ totalKbps += kbps
+ }
+
+ var maxKbps float64
+ if v, ok := maxKbpses[cacheName]; !ok {
+ log.Infof("cache not in max kbps cache %s\n",
cacheName)
+ } else {
+ maxKbps = float64(v)
+ totalMaxKbps += maxKbps
+ }
+
+ var connections int64
+ connectionsVal, ok := conns[cacheName]
+ if !ok {
+ log.Infof("cache not in connections %s\n",
cacheName)
+ } else {
+ totalConnections += connectionsVal
+ connections = connectionsVal
+ }
+ interfaceStatus[interfaceName] = CacheInterfaceStatus{
+ Status: &status,
+ StatusPoller: &statusPoller,
+ BandwidthKbps: &kbps,
+ BandwidthCapacityKbps: &maxKbps,
+ ConnectionCount: &connections,
+ IPv4Available: &ipv4,
+ IPv6Available: &ipv6,
+ CombinedAvailable: &combinedStatus,
+ }
+ }
cacheTypeStr := ""
if cacheType, ok := cacheTypes[cacheName]; !ok {
@@ -138,30 +196,7 @@ func createCacheStatuses(
log.Infof("Error getting cache %v health span: %v\n",
cacheName, err)
}
- var kbps *float64
- if lastStat, ok := lastStats.Caches[cacheName]; !ok {
- log.Infof("cache not in last kbps cache %s\n",
cacheName)
- } else {
- kbpsVal := lastStat.Bytes.PerSec /
float64(ds.BytesPerKilobit)
- kbps = &kbpsVal
- }
-
- var maxKbps *float64
- if v, ok := maxKbpses[cacheName]; !ok {
- log.Infof("cache not in max kbps cache %s\n", cacheName)
- } else {
- fv := float64(v)
- maxKbps = &fv
- }
-
- var connections *int64
- connectionsVal, ok := conns[cacheName]
- if !ok {
- log.Infof("cache not in connections %s\n", cacheName)
- } else {
- connections = &connectionsVal
- }
-
+ status, statusPoller, ipv4, ipv6, combinedStatus :=
cacheStatusAndPoller(cacheName, tc.CacheInterfacesAggregate, serverInfo,
localCacheStatus)
statii[cacheName] = CacheStatus{
Type: &cacheTypeStr,
LoadAverage: &loadAverage,
@@ -170,14 +205,15 @@ func createCacheStatuses(
HealthTimeMilliseconds: &healthTime,
StatSpanMilliseconds: &statSpan,
HealthSpanMilliseconds: &healthSpan,
- BandwidthKbps: kbps,
- BandwidthCapacityKbps: maxKbps,
- ConnectionCount: connections,
+ BandwidthKbps: &totalKbps,
+ BandwidthCapacityKbps: &totalMaxKbps,
+ ConnectionCount: &totalConnections,
Status: &status,
StatusPoller: &statusPoller,
IPv4Available: &ipv4,
IPv6Available: &ipv6,
CombinedAvailable: &combinedStatus,
+ Interfaces: &interfaceStatus,
}
}
return statii
@@ -185,7 +221,7 @@ func createCacheStatuses(
//cacheStatusAndPoller returns the the reason why a cache is unavailable (or
that is available), the poller, and 3 booleans in order:
// IPv4 availability, IPv6 availability and Processed availability which is
what the monitor reports based on the PollingProtocol chosen (ipv4only,ipv6only
or both)
-func cacheStatusAndPoller(server tc.CacheName, serverInfo tc.TrafficServer,
localCacheStatus cache.AvailableStatuses) (string, string, bool, bool, bool) {
+func cacheStatusAndPoller(server tc.CacheName, interfaceName string,
serverInfo tc.TrafficServer, localCacheStatus cache.AvailableStatuses) (string,
string, bool, bool, bool) {
switch status := tc.CacheStatusFromString(serverInfo.ServerStatus);
status {
case tc.CacheStatusAdminDown:
fallthrough
@@ -195,14 +231,18 @@ func cacheStatusAndPoller(server tc.CacheName, serverInfo
tc.TrafficServer, loca
return status.String(), "", false, false, false
}
- statusVal, ok := localCacheStatus[server]
- if !ok {
+ if _, ok := localCacheStatus[server]; !ok {
log.Infof("cache not in statuses %s\n", server)
return "ERROR - not in statuses", "", false, false, false
}
+ if _, ok := localCacheStatus[server][interfaceName]; !ok {
+ log.Infof("interface %s not in cache %s", interfaceName, server)
+ return "ERROR - not in statuses", "", false, false, false
+ }
+ statusVal := localCacheStatus[server][interfaceName]
if statusVal.Why != "" {
- return statusVal.Why, statusVal.Poller,
statusVal.Available.IPv4, statusVal.Available.IPv6, statusVal.ProcessedAvailable
+ return fmt.Sprintf("%s", statusVal.Why), statusVal.Poller,
statusVal.Available.IPv4, statusVal.Available.IPv6, statusVal.ProcessedAvailable
}
if statusVal.ProcessedAvailable {
return fmt.Sprintf("%s - available", statusVal.Status),
statusVal.Poller, statusVal.Available.IPv4, statusVal.Available.IPv6,
statusVal.ProcessedAvailable
@@ -212,8 +252,11 @@ func cacheStatusAndPoller(server tc.CacheName, serverInfo
tc.TrafficServer, loca
func createCacheConnections(statResultHistory threadsafe.ResultStatHistory)
map[tc.CacheName]int64 {
conns := map[tc.CacheName]int64{}
- statResultHistory.Range(func(server tc.CacheName, history
threadsafe.ResultStatValHistory) bool {
- // for server, history := range statResultHistory {
+ statResultHistory.Range(func(server tc.CacheName, interf string,
history threadsafe.ResultStatValHistory) bool {
+ // We only want to create connections for each cache
+ if interf != tc.CacheInterfacesAggregate {
+ return true
+ }
vals :=
history.Load("proxy.process.http.current_client_connections")
if len(vals) == 0 {
return true
diff --git a/traffic_monitor/datareq/statsummary.go
b/traffic_monitor/datareq/statsummary.go
index e3c31b3..7fb938d 100644
--- a/traffic_monitor/datareq/statsummary.go
+++ b/traffic_monitor/datareq/statsummary.go
@@ -31,8 +31,7 @@ import (
"github.com/apache/trafficcontrol/traffic_monitor/srvhttp"
"github.com/apache/trafficcontrol/traffic_monitor/threadsafe"
"github.com/apache/trafficcontrol/traffic_monitor/todata"
-
- "github.com/json-iterator/go"
+ jsoniter "github.com/json-iterator/go"
)
type StatSummary struct {
@@ -70,10 +69,14 @@ func createStatSummary(statResultHistory
threadsafe.ResultStatHistory, filter ca
CommonAPIData: srvhttp.GetCommonAPIData(params, time.Now()),
}
- statResultHistory.Range(func(cacheName tc.CacheName, stats
threadsafe.ResultStatValHistory) bool {
+ statResultHistory.Range(func(cacheName tc.CacheName, interf string,
stats threadsafe.ResultStatValHistory) bool {
if !filter.UseCache(cacheName) {
return true
}
+ // for now only look at aggregate stats
+ if interf != tc.CacheInterfacesAggregate {
+ return true
+ }
ssStats := map[string]StatSummaryStat{}
stats.Range(func(statName string, statHistory
[]cache.ResultStatVal) bool {
if !filter.UseStat(statName) {
diff --git a/traffic_monitor/health/cache.go b/traffic_monitor/health/cache.go
index 04fc90d..830818c 100644
--- a/traffic_monitor/health/cache.go
+++ b/traffic_monitor/health/cache.go
@@ -43,20 +43,24 @@ func GetVitals(newResult *cache.Result, prevResult
*cache.Result, mc *tc.Traffic
// proc.loadavg -- we're using the 1 minute average (!?)
newResult.Vitals.LoadAvg = newResult.Statistics.Loadavg.One
- // For now, just getting the first encountered interface in the list of
- // interfaces parsed out into statistics. In the future, vitals will
need
- // to consider all interfaces configured on a cache server.
- for name, iface := range newResult.Statistics.Interfaces {
- log.Infof("Cache '%s' has %d interfaces, we have arbitrarily
chosen %s for vitality", newResult.ID, len(newResult.Statistics.Interfaces),
name)
- newResult.Vitals.BytesOut = iface.BytesOut
- newResult.Vitals.BytesIn = iface.BytesIn
+ // We will use the average for vitality, this has the possibility of
overflow
+ // however the code had a few overflow issues that has been in
production for years
+ // at some point this will need to be chagned when Vitals is revisted
+ log.Infof("Cache '%s' has %d interfaces, we will use average (overflow
possible) for vitality", newResult.ID, len(newResult.Statistics.Interfaces))
+ for _, iface := range newResult.Interfaces() {
+ // Overflow possible
+ newResult.Vitals.BytesOut += iface.BytesOut
+ newResult.Vitals.BytesIn += iface.BytesIn
// TODO JvD: Should we really be running this code every second
for every cache polled????? I don't think so.
- newResult.Vitals.MaxKbpsOut = iface.Speed * 1000
- break
+ newResult.Vitals.MaxKbpsOut += iface.Speed * 1000
}
+
+ newResult.Vitals.BytesOut = newResult.Vitals.BytesOut /
uint64(len(newResult.Interfaces()))
+ newResult.Vitals.BytesIn = newResult.Vitals.BytesIn /
uint64(len(newResult.Interfaces()))
+ newResult.Vitals.MaxKbpsOut = newResult.Vitals.MaxKbpsOut /
int64(len(newResult.Interfaces()))
if prevResult != nil && prevResult.Vitals.BytesOut != 0 {
elapsedTimeInSecs :=
float64(newResult.Time.UnixNano()-prevResult.Time.UnixNano()) / 1000000000
- newResult.Vitals.KbpsOut =
int64(float64(((newResult.Vitals.BytesOut - prevResult.Vitals.BytesOut) * 8 /
1000)) / elapsedTimeInSecs)
+ newResult.Vitals.KbpsOut =
int64(float64((newResult.Vitals.BytesOut-prevResult.Vitals.BytesOut)*8/1000) /
elapsedTimeInSecs)
}
}
@@ -152,6 +156,7 @@ func EvalCache(result cache.ResultInfo, resultStats
*threadsafe.ResultStatValHis
func CalcAvailability(results []cache.Result, pollerName string,
statResultHistory *threadsafe.ResultStatHistory, mc tc.TrafficMonitorConfigMap,
toData todata.TOData, localCacheStatusThreadsafe
threadsafe.CacheAvailableStatus, localStates peer.CRStatesThreadsafe, events
ThreadsafeEvents, protocol config.PollingProtocol) {
localCacheStatuses := localCacheStatusThreadsafe.Get().Copy()
statResults := (*threadsafe.ResultStatValHistory)(nil)
+ statResultsVal := (*map[string]threadsafe.ResultStatValHistory)(nil)
processAvailableTuple := func(tuple cache.AvailableTuple, serverInfo
tc.TrafficServer) bool {
switch protocol {
case config.IPv4Only:
@@ -175,65 +180,141 @@ func CalcAvailability(results []cache.Result, pollerName
string, statResultHisto
for _, result := range results {
if statResultHistory != nil {
- statResultsVal :=
statResultHistory.LoadOrStore(tc.CacheName(result.ID))
- statResults = &statResultsVal
+ t :=
statResultHistory.LoadOrStore(tc.CacheName(result.ID))
+ statResultsVal = &t
}
- isAvailable, usingIPv4, whyAvailable, unavailableStat :=
EvalCache(cache.ToInfo(result), statResults, &mc)
-
- // if the cache is now Available, and was previously
unavailable due to a threshold, make sure this poller contains the stat which
exceeded the threshold.
- previousStatus, hasPreviousStatus :=
localCacheStatuses[tc.CacheName(result.ID)]
- availableTuple := cache.AvailableTuple{}
-
serverInfo, ok := mc.TrafficServer[result.ID]
if !ok {
log.Errorf("Cache %v missing from from Traffic Ops
Monitor Config - treating as OFFLINE\n", result.ID)
}
- if hasPreviousStatus {
- availableTuple = previousStatus.Available
- availableTuple.SetAvailability(usingIPv4, isAvailable)
+ resultInfo := cache.ToInfo(result)
+ for interfaceName, _ := range result.Interfaces() {
+ if statResultsVal != nil {
+ t := (*statResultsVal)[interfaceName]
+ statResults = &t
+ }
+ isAvailable, usingIPv4, whyAvailable, unavailableStat
:= EvalCache(resultInfo, statResults, &mc)
- if processAvailableTuple(availableTuple, serverInfo) {
- if
!processAvailableTuple(previousStatus.Available, serverInfo) &&
previousStatus.UnavailableStat != "" {
- if
!result.HasStat(previousStatus.UnavailableStat) {
- return
+ // if the cache is now Available, and was previously
unavailable due to a threshold, make sure this poller contains the stat which
exceeded the threshold.
+ previousStatus, hasPreviousStatus :=
localCacheStatuses[tc.CacheName(result.ID)][interfaceName]
+ availableTuple := cache.AvailableTuple{}
+
+ if hasPreviousStatus {
+ availableTuple = previousStatus.Available
+ availableTuple.SetAvailability(usingIPv4,
isAvailable)
+
+ if processAvailableTuple(availableTuple,
serverInfo) {
+ if
!processAvailableTuple(previousStatus.Available, serverInfo) &&
previousStatus.UnavailableStat != "" {
+ if
!result.HasStat(previousStatus.UnavailableStat) {
+ return
+ }
}
}
+ } else {
+ availableTuple.SetAvailability(usingIPv4,
isAvailable)
+ }
+
+ // update availableTuple so TM UI is updated if one IP
is removed
+ if availableTuple.IPv4 && serverInfo.IP == "" {
+ availableTuple.IPv4 = false
+ }
+ if availableTuple.IPv6 && serverInfo.IP6 == "" {
+ availableTuple.IPv6 = false
+ }
+
+ newAvailableState :=
processAvailableTuple(availableTuple, serverInfo)
+
+ if _, ok :=
localCacheStatuses[tc.CacheName(result.ID)]; !ok {
+ localCacheStatuses[tc.CacheName(result.ID)] =
make(map[string]cache.AvailableStatus)
}
- } else {
- availableTuple.SetAvailability(usingIPv4, isAvailable)
- }
- // update availableTuple so TM UI is updated if one IP is
removed
- if availableTuple.IPv4 && serverInfo.IP == "" {
- availableTuple.IPv4 = false
+
localCacheStatuses[tc.CacheName(result.ID)][interfaceName] =
cache.AvailableStatus{
+ Available: availableTuple,
+ ProcessedAvailable: newAvailableState,
+ Status:
mc.TrafficServer[string(result.ID)].ServerStatus,
+ Why: whyAvailable,
+ UnavailableStat: unavailableStat,
+ Poller: pollerName,
+ LastCheckedIPv4: usingIPv4,
+ } // TODO move within localStates?
}
- if availableTuple.IPv6 && serverInfo.IP6 == "" {
- availableTuple.IPv6 = false
+
+ // Compute aggregate data based on each interface
+ aggregateStatus := cache.AvailableStatus{
+ Available: cache.AvailableTuple{
+ IPv4: false,
+ IPv6: false,
+ },
+ ProcessedAvailable: false,
+ LastCheckedIPv4: false,
+ Status: "",
+ Why: "",
+ UnavailableStat: "",
+ Poller: "",
}
+ for interfaceName, status := range
localCacheStatuses[tc.CacheName(result.ID)] {
+ if interfaceName == tc.CacheInterfacesAggregate {
+ continue
+ }
+ aggregateStatus.Available.IPv4 =
aggregateStatus.Available.IPv4 || status.Available.IPv4
+ aggregateStatus.Available.IPv6 =
aggregateStatus.Available.IPv6 || status.Available.IPv6
- newAvailableState := processAvailableTuple(availableTuple,
serverInfo)
+ // What does this mean on aggregated data?
+ // For now assume that if any interface was then the
aggregate is
+ aggregateStatus.LastCheckedIPv4 =
aggregateStatus.LastCheckedIPv4 || status.LastCheckedIPv4
- localCacheStatuses[tc.CacheName(result.ID)] =
cache.AvailableStatus{
- Available: availableTuple,
- ProcessedAvailable: newAvailableState,
- Status:
mc.TrafficServer[string(result.ID)].ServerStatus,
- Why: whyAvailable,
- UnavailableStat: unavailableStat,
- Poller: pollerName,
- LastCheckedIPv4: usingIPv4,
- } // TODO move within localStates?
+ if status.Why != "" {
+ newWhyText := fmt.Sprintf("%s: %s",
interfaceName, status.Why)
+ if aggregateStatus.Why != "" {
+ newWhyText = ", " + newWhyText
+ }
+ aggregateStatus.Why += newWhyText
+ }
- if available, ok :=
localStates.GetCache(tc.CacheName(result.ID)); !ok || available.IsAvailable !=
newAvailableState {
+ if status.UnavailableStat != "" {
+ newUnavailableText := fmt.Sprintf("%s: %s",
interfaceName, status.UnavailableStat)
+ if aggregateStatus.UnavailableStat != "" {
+ newUnavailableText += ", " +
newUnavailableText
+ }
+ aggregateStatus.UnavailableStat =
newUnavailableText
+ }
+
+ // What does this mean on aggregated data?
+ // For now use random status unless a REPORTED status
is found
+ if tc.CacheStatus(aggregateStatus.Status) !=
tc.CacheStatusReported {
+ aggregateStatus.Status = status.Status
+ }
+
+ // Each interface in a cache should always have the
same poller
+ aggregateStatus.Poller = status.Poller
+ }
+ aggregateStatus.ProcessedAvailable =
processAvailableTuple(aggregateStatus.Available, serverInfo)
+
localCacheStatuses[tc.CacheName(result.ID)][tc.CacheInterfacesAggregate] =
aggregateStatus
+
+ localStates.SetCache(tc.CacheName(result.ID), tc.IsAvailable{
+ IsAvailable: aggregateStatus.ProcessedAvailable,
+ Ipv4Available: aggregateStatus.Available.IPv4,
+ Ipv6Available: aggregateStatus.Available.IPv6,
+ })
+
+ if statResultsVal != nil {
+ t := (*statResultsVal)[tc.CacheInterfacesAggregate]
+ statResults = &t
+ }
+ _, usingIPv4, whyAvailable, _ := EvalCache(resultInfo,
statResults, &mc)
+ if available, ok :=
localStates.GetCache(tc.CacheName(result.ID)); !ok || available.IsAvailable !=
aggregateStatus.ProcessedAvailable {
protocol := "IPv4"
if !usingIPv4 {
protocol = "IPv6"
}
- log.Infof("Changing state for %s was: %t now: %t
because %s poller: %v on protocol %v error: %v", result.ID,
available.IsAvailable, newAvailableState, whyAvailable, pollerName, protocol,
result.Error)
- events.Add(Event{Time: Time(time.Now()), Description:
"Protocol: (" + protocol + ") " + whyAvailable + " (" + pollerName + ")", Name:
string(result.ID), Hostname: string(result.ID), Type:
toData.ServerTypes[tc.CacheName(result.ID)].String(), Available:
newAvailableState, IPv4Available: availableTuple.IPv4, IPv6Available:
availableTuple.IPv6})
+ log.Infof("Changing state for %s was: %t now: %t
because %s poller: %v on protocol %v error: %v",
+ result.ID, available.IsAvailable,
aggregateStatus.ProcessedAvailable, whyAvailable, pollerName, protocol,
result.Error)
+ events.Add(Event{Time: Time(time.Now()), Description:
"Protocol: (" + protocol + ") " + whyAvailable +
+ " (" + pollerName + ")", Name: result.ID,
Hostname: result.ID,
+ Type:
toData.ServerTypes[tc.CacheName(result.ID)].String(), Available:
aggregateStatus.ProcessedAvailable,
+ IPv4Available: aggregateStatus.Available.IPv4,
IPv6Available: aggregateStatus.Available.IPv6})
}
-
- localStates.SetCache(tc.CacheName(result.ID),
tc.IsAvailable{IsAvailable: newAvailableState, Ipv4Available:
availableTuple.IPv4, Ipv6Available: availableTuple.IPv6})
}
calculateDeliveryServiceState(toData.DeliveryServiceServers,
localStates, toData)
localCacheStatusThreadsafe.Set(localCacheStatuses)
diff --git a/traffic_monitor/health/cache_test.go
b/traffic_monitor/health/cache_test.go
index d0c10d0..4451e69 100644
--- a/traffic_monitor/health/cache_test.go
+++ b/traffic_monitor/health/cache_test.go
@@ -20,11 +20,12 @@ package health
*/
import (
- "github.com/apache/trafficcontrol/traffic_monitor/config"
"strings"
"testing"
"time"
+ "github.com/apache/trafficcontrol/traffic_monitor/config"
+
"github.com/apache/trafficcontrol/lib/go-tc"
"github.com/apache/trafficcontrol/traffic_monitor/cache"
"github.com/apache/trafficcontrol/traffic_monitor/peer"
@@ -52,6 +53,11 @@ func TestCalcAvailabilityThresholds(t *testing.T) {
BytesIn: 1234567891011121,
BytesOut: 12345678910111213,
},
+ "eth0": cache.Interface{
+ Speed: 30000,
+ BytesIn: 1234567891011121,
+ BytesOut: 12345678910111213,
+ },
},
NotAvailable: false,
},
@@ -113,20 +119,26 @@ func TestCalcAvailabilityThresholds(t *testing.T) {
CalcAvailability(results, pollerName, statResultHistory, mc, toData,
localCacheStatusThreadsafe, localStates, events, config.Both)
localCacheStatuses := localCacheStatusThreadsafe.Get()
- if localCacheStatus, ok := localCacheStatuses[tc.CacheName(result.ID)];
!ok {
+ if _, ok := localCacheStatuses[tc.CacheName(result.ID)]; !ok {
t.Fatalf("expected: localCacheStatus[cacheName], actual:
missing")
- } else if localCacheStatus.Available.IPv4 {
- t.Fatalf("localCacheStatus.Available.IPv4 over kbps threshold
expected: false, actual: true")
- } else if localCacheStatus.Available.IPv6 {
- t.Fatalf("localCacheStatus.Available.IPv6 over kbps threshold
expected: false, actual: true")
- } else if localCacheStatus.Status != string(tc.CacheStatusReported) {
- t.Fatalf("localCacheStatus.Status expected %v actual %v",
"todo", localCacheStatus.Status)
- } else if localCacheStatus.UnavailableStat !=
"availableBandwidthInKbps" {
- t.Fatalf("localCacheStatus.UnavailableStat expected %v actual
%v", "availableBandwidthInKbps", localCacheStatus.UnavailableStat)
- } else if localCacheStatus.Poller != pollerName {
- t.Fatalf("localCacheStatus.Poller expected %v actual %v",
pollerName, localCacheStatus.Poller)
- } else if !strings.Contains(localCacheStatus.Why,
"availableBandwidthInKbps too low") {
- t.Fatalf("localCacheStatus.Why expected
'availableBandwidthInKbps too low' actual %v", localCacheStatus.Why)
+ }
+ for interfaceName, localCacheStatus := range
localCacheStatuses[tc.CacheName(result.ID)] {
+ if interfaceName == tc.CacheInterfacesAggregate {
+ continue
+ }
+ if localCacheStatus.Available.IPv4 {
+ t.Fatalf("localCacheStatus.Available.IPv4 (%s) over
kbps threshold expected: false, actual: true", interfaceName)
+ } else if localCacheStatus.Available.IPv6 {
+ t.Fatalf("localCacheStatus.Available.IPv6 (%s) over
kbps threshold expected: false, actual: true", interfaceName)
+ } else if localCacheStatus.Status !=
string(tc.CacheStatusReported) {
+ t.Fatalf("localCacheStatus.Status (%s) expected %v
actual %v", interfaceName, "todo", localCacheStatus.Status)
+ } else if localCacheStatus.UnavailableStat !=
"availableBandwidthInKbps" {
+ t.Fatalf("localCacheStatus.UnavailableStat (%s)
expected %v actual %v", interfaceName, "availableBandwidthInKbps",
localCacheStatus.UnavailableStat)
+ } else if localCacheStatus.Poller != pollerName {
+ t.Fatalf("localCacheStatus.Poller (%s) expected %v
actual %v", interfaceName, pollerName, localCacheStatus.Poller)
+ } else if !strings.Contains(localCacheStatus.Why,
"availableBandwidthInKbps too low") {
+ t.Fatalf("localCacheStatus.Why (%s) expected
'availableBandwidthInKbps too low' actual %v", interfaceName,
localCacheStatus.Why)
+ }
}
// test that the health poll didn't override the stat poll threshold
markdown and mark available
@@ -143,9 +155,8 @@ func TestCalcAvailabilityThresholds(t *testing.T) {
CalcAvailability(healthResults, healthPollerName, nil, mc, toData,
localCacheStatusThreadsafe, localStates, events, config.Both)
localCacheStatuses = localCacheStatusThreadsafe.Get()
- if localCacheStatus, ok := localCacheStatuses[tc.CacheName(result.ID)];
!ok {
- t.Fatalf("expected: localCacheStatus[cacheName], actual:
missing")
- } else if localCacheStatus.Available.IPv4 {
+ localCacheStatus := localCacheStatuses[tc.CacheName(result.ID)]["bond0"]
+ if localCacheStatus.Available.IPv4 {
t.Fatalf("localCacheStatus.Available.IPv4 over kbps threshold
expected: false, actual: true")
} else if localCacheStatus.Available.IPv6 {
t.Fatalf("localCacheStatus.Available.IPv6 over kbps threshold
expected: false, actual: true")
diff --git a/traffic_monitor/threadsafe/cacheavailablestatus.go
b/traffic_monitor/threadsafe/cacheavailablestatus.go
index 107341d..2a9cec2 100644
--- a/traffic_monitor/threadsafe/cacheavailablestatus.go
+++ b/traffic_monitor/threadsafe/cacheavailablestatus.go
@@ -34,7 +34,7 @@ type CacheAvailableStatus struct {
// NewCacheAvailableStatus creates and returns a new CacheAvailableStatus,
initializing internal pointer values.
func NewCacheAvailableStatus() CacheAvailableStatus {
- c := cache.AvailableStatuses(map[tc.CacheName]cache.AvailableStatus{})
+ c :=
cache.AvailableStatuses(map[tc.CacheName]map[string]cache.AvailableStatus{})
return CacheAvailableStatus{m: &sync.RWMutex{}, caches: &c}
}
diff --git a/traffic_monitor/threadsafe/resultstathistory.go
b/traffic_monitor/threadsafe/resultstathistory.go
index e1fc55a..31a8919 100644
--- a/traffic_monitor/threadsafe/resultstathistory.go
+++ b/traffic_monitor/threadsafe/resultstathistory.go
@@ -32,7 +32,7 @@ import (
"github.com/apache/trafficcontrol/traffic_monitor/cache"
"github.com/apache/trafficcontrol/traffic_monitor/srvhttp"
- "github.com/json-iterator/go"
+ jsoniter "github.com/json-iterator/go"
)
// ResultStatHistory provides safe access for multiple goroutines readers and
a single writer to a stored HistoryHistory object.
@@ -62,22 +62,38 @@ func (h *ResultInfoHistory) Set(v cache.ResultInfoHistory) {
h.m.Unlock()
}
-type ResultStatHistory struct{ *sync.Map } //
map[tc.CacheName]ResultStatValHistory
+type ResultStatHistory struct{ *sync.Map } //
map[tc.CacheName]map[interfaceName]ResultStatValHistory
func NewResultStatHistory() ResultStatHistory {
return ResultStatHistory{&sync.Map{}}
}
-func (h ResultStatHistory) LoadOrStore(cache tc.CacheName)
ResultStatValHistory {
+func (h ResultStatHistory) LoadOrStore(cache tc.CacheName)
map[string]ResultStatValHistory {
// TODO change to use sync.Pool?
- v, _ := h.Map.LoadOrStore(cache, NewResultStatValHistory())
- return v.(ResultStatValHistory)
+ v, loaded := h.Map.LoadOrStore(cache, NewResultStatValHistory())
+ if !loaded {
+ v = map[string]ResultStatValHistory{}
+ }
+ if rv, ok := v.(ResultStatValHistory); ok {
+ v =
map[string]ResultStatValHistory{tc.CacheInterfacesAggregate: rv}
+ }
+ return v.(map[string]ResultStatValHistory)
}
// Range behaves like sync.Map.Range. It calls f for every value in the map;
if f returns false, the iteration is stopped.
-func (h ResultStatHistory) Range(f func(cache tc.CacheName, val
ResultStatValHistory) bool) {
+func (h ResultStatHistory) Range(f func(cache tc.CacheName, interfaceName
string, val ResultStatValHistory) bool) {
h.Map.Range(func(k, v interface{}) bool {
- return f(k.(tc.CacheName), v.(ResultStatValHistory))
+ i, ok := v.(map[string]ResultStatValHistory)
+ if !ok {
+ log.Warnln("Cannot umarshal result stat val history")
+ return true
+ }
+ for a, b := range i {
+ if !f(k.(tc.CacheName), a, b) {
+ return false
+ }
+ }
+ return true
})
}
@@ -89,11 +105,11 @@ func NewResultStatValHistory() ResultStatValHistory {
return ResultStatValHistor
// Load returns the []ResultStatVal for the given stat. If the given stat does
not exist, nil is returned.
func (h ResultStatValHistory) Load(stat string) []cache.ResultStatVal {
- v, ok := h.Map.Load(stat)
+ i, ok := h.Map.Load(stat)
if !ok {
return nil
}
- return v.([]cache.ResultStatVal)
+ return i.([]cache.ResultStatVal)
}
// Range behaves like sync.Map.Range. It calls f for every value in the map;
if f returns false, the iteration is stopped.
@@ -104,7 +120,8 @@ func (h ResultStatValHistory) Range(f func(stat string, val
[]cache.ResultStatVa
}
// Store stores the given []ResultStatVal in the ResultStatValHistory for the
given stat. Store is threadsafe for only one writer.
-// Specifically, if there are multiple writers, there's a race, that one
writer could Load(), another writer could Store() underneath it, and the first
writer would then Store() having lost values.
+// Specifically, if there are multiple writers, there's a race, that one
writer could Load(), another writer could Store() underneath it,
+// and the first writer would then Store() having lost values.
// To safely use ResultStatValHistory with multiple writers, a CompareAndSwap
function would have to be added.
func (h ResultStatValHistory) Store(stat string, vals []cache.ResultStatVal) {
h.Map.Store(stat, vals)
@@ -119,40 +136,42 @@ func (a ResultStatHistory) Add(r cache.Result, limit
uint64) error {
}
for statName, statVal := range r.Miscellaneous {
- statHistory := resultHistory.Load(statName)
- if len(statHistory) == 0 {
- statHistory = make([]cache.ResultStatVal, 0, limit) //
initialize to the limit, to avoid multiple allocations. TODO put in
.Load(statName, defaultSize)?
- }
-
- // TODO check len(statHistory) == 0 before indexing, potential
panic?
-
- ok, err := newStatEqual(statHistory, statVal)
-
- // If the new stat value is the same as the last, update the
time and increment the span. Span is the number of polls the latest value has
been the same, and hence the length of time it's been the same is
span*pollInterval.
- if err != nil {
- errStrs += "cannot add stat " + statName + ": " +
err.Error() + "; "
- } else if ok {
- statHistory[0].Time = r.Time
- statHistory[0].Span++
- } else {
- resultVal := cache.ResultStatVal{
- Val: statVal,
- Time: r.Time,
- Span: 1,
+ for interfaceName, _ := range resultHistory {
+ statHistory :=
resultHistory[interfaceName].Load(statName)
+ if len(statHistory) == 0 {
+ statHistory = make([]cache.ResultStatVal, 0,
limit) // initialize to the limit, to avoid multiple allocations. TODO put in
.Load(statName, defaultSize)?
}
- if len(statHistory) > int(limit) {
- statHistory = statHistory[:int(limit)]
- } else if len(statHistory) < int(limit) {
- statHistory = append(statHistory,
cache.ResultStatVal{})
- }
- // shift all values to the right, in order to put the
new val at the beginning. Faster than allocating memory again
- for i := len(statHistory) - 1; i >= 1; i-- {
- statHistory[i] = statHistory[i-1]
+ // TODO check len(statHistory) == 0 before indexing,
potential panic?
+
+ ok, err := newStatEqual(statHistory, statVal)
+
+ // If the new stat value is the same as the last,
update the time and increment the span. Span is the number of polls the latest
value has been the same, and hence the length of time it's been the same is
span*pollInterval.
+ if err != nil {
+ errStrs += "cannot add stat " + statName + ": "
+ err.Error() + "; "
+ } else if ok {
+ statHistory[0].Time = r.Time
+ statHistory[0].Span++
+ } else {
+ resultVal := cache.ResultStatVal{
+ Val: statVal,
+ Time: r.Time,
+ Span: 1,
+ }
+
+ if len(statHistory) > int(limit) {
+ statHistory = statHistory[:int(limit)]
+ } else if len(statHistory) < int(limit) {
+ statHistory = append(statHistory,
cache.ResultStatVal{})
+ }
+ // shift all values to the right, in order to
put the new val at the beginning. Faster than allocating memory again
+ for i := len(statHistory) - 1; i >= 1; i-- {
+ statHistory[i] = statHistory[i-1]
+ }
+ statHistory[0] = resultVal // new result at the
beginning
}
- statHistory[0] = resultVal // new result at the
beginning
+ resultHistory[interfaceName].Store(statName,
statHistory)
}
- resultHistory.Store(statName, statHistory)
}
if errStrs != "" {
@@ -187,7 +206,7 @@ func newStatEqual(history []cache.ResultStatVal, stat
interface{}) (bool, error)
func StatsMarshall(statResultHistory ResultStatHistory, statInfo
cache.ResultInfoHistory, combinedStates tc.CRStates, monitorConfig
tc.TrafficMonitorConfigMap, statMaxKbpses cache.Kbpses, filter cache.Filter,
params url.Values) ([]byte, error) {
stats := cache.Stats{
CommonAPIData: srvhttp.GetCommonAPIData(params, time.Now()),
- Caches:
map[tc.CacheName]map[string][]cache.ResultStatVal{},
+ Caches:
map[tc.CacheName]map[string]map[string][]cache.ResultStatVal{},
}
computedStats := cache.ComputedStats()
@@ -200,24 +219,30 @@ func StatsMarshall(statResultHistory ResultStatHistory,
statInfo cache.ResultInf
}
cacheStatResultHistory := statResultHistory.LoadOrStore(id)
- cacheStatResultHistory.Range(func(stat string, vals
[]cache.ResultStatVal) bool {
- stat = "ats." + stat // TM1 prefixes ATS stats with
'ats.'
- if !filter.UseStat(stat) {
- return true
- }
- historyCount := 1
- for _, val := range vals {
- if !filter.WithinStatHistoryMax(historyCount) {
- break
+ for interfaceName, interfaceHistory := range
cacheStatResultHistory {
+ interfaceHistory.Range(func(stat string, vals
[]cache.ResultStatVal) bool {
+ stat = "ats." + stat // TM1 prefixes ATS stats
with 'ats.'
+ if !filter.UseStat(stat) {
+ return true
}
+ historyCount := 1
if _, ok := stats.Caches[id]; !ok {
- stats.Caches[id] =
map[string][]cache.ResultStatVal{}
+ stats.Caches[id] =
map[string]map[string][]cache.ResultStatVal{}
}
- stats.Caches[id][stat] =
append(stats.Caches[id][stat], val)
- historyCount += int(val.Span)
- }
- return true
- })
+ for _, val := range vals {
+ if
!filter.WithinStatHistoryMax(historyCount) {
+ break
+ }
+ if _, ok :=
stats.Caches[id][interfaceName]; !ok {
+ stats.Caches[id][interfaceName]
= map[string][]cache.ResultStatVal{}
+ }
+ stats.Caches[id][interfaceName][stat] =
append(stats.Caches[id][interfaceName][stat], val)
+ // Todo add for each interface?
+ historyCount += int(val.Span)
+ }
+ return true
+ })
+ }
serverInfo, ok := monitorConfig.TrafficServer[string(id)]
if !ok {
@@ -234,7 +259,7 @@ func StatsMarshall(statResultHistory ResultStatHistory,
statInfo cache.ResultInf
break
}
if _, ok := stats.Caches[id]; !ok {
- stats.Caches[id] =
map[string][]cache.ResultStatVal{}
+ stats.Caches[id] =
map[string]map[string][]cache.ResultStatVal{}
}
t := resultInfo.Time
@@ -243,7 +268,19 @@ func StatsMarshall(statResultHistory ResultStatHistory,
statInfo cache.ResultInf
if !filter.UseStat(stat) {
continue
}
- stats.Caches[id][stat] =
append(stats.Caches[id][stat], cache.ResultStatVal{Val: statValF(resultInfo,
serverInfo, serverProfile, combinedStatesCache), Time: t, Span: 1}) //
combinedState will default to unavailable
+ if _, ok :=
stats.Caches[id][tc.CacheInterfacesAggregate]; !ok {
+
stats.Caches[id][tc.CacheInterfacesAggregate] =
map[string][]cache.ResultStatVal{}
+ }
+
stats.Caches[id][tc.CacheInterfacesAggregate][stat] =
append(stats.Caches[id][tc.CacheInterfacesAggregate][stat],
+ cache.ResultStatVal{Val:
statValF(resultInfo, serverInfo, serverProfile, combinedStatesCache), Time: t,
Span: 1})
+ // Need to actually handle interfaces, needs
vitals to be refactored
+ for infName, _ := range
resultInfo.Statistics.Interfaces {
+ if _, ok := stats.Caches[id][infName];
!ok {
+ stats.Caches[id][infName] =
map[string][]cache.ResultStatVal{}
+ }
+ stats.Caches[id][infName][stat] =
append(stats.Caches[id][infName][stat],
+ cache.ResultStatVal{Val:
statValF(resultInfo, serverInfo, serverProfile, combinedStatesCache), Time: t,
Span: 1})
+ }
}
}
}
diff --git a/traffic_monitor/threadsafe/resultstathistory_test.go
b/traffic_monitor/threadsafe/resultstathistory_test.go
index 87d0a45..fa8ae82 100644
--- a/traffic_monitor/threadsafe/resultstathistory_test.go
+++ b/traffic_monitor/threadsafe/resultstathistory_test.go
@@ -29,8 +29,7 @@ import (
"github.com/apache/trafficcontrol/lib/go-tc"
"github.com/apache/trafficcontrol/traffic_monitor/cache"
"github.com/apache/trafficcontrol/traffic_monitor/srvhttp"
-
- "github.com/json-iterator/go"
+ jsoniter "github.com/json-iterator/go"
)
func randResultStatHistory() ResultStatHistory {
diff --git a/traffic_ops/traffic_ops_golang/deliveryservice/capacity.go
b/traffic_ops/traffic_ops_golang/deliveryservice/capacity.go
index b653c37..fda184e 100644
--- a/traffic_ops/traffic_ops_golang/deliveryservice/capacity.go
+++ b/traffic_ops/traffic_ops_golang/deliveryservice/capacity.go
@@ -135,29 +135,36 @@ const StatNameKBPS = "kbps"
const StatNameMaxKBPS = "maxKbps"
func addCapacity(cap CapData, ds tc.DeliveryServiceName, cacheStats
tmcache.Stats, crStates tc.CRStates, crConfig tc.CRConfig, thresholds
map[string]float64) CapData {
- for cacheName, stats := range cacheStats.Caches {
+ for cacheName, interfaces := range cacheStats.Caches {
+ if _, ok := interfaces[tc.CacheInterfacesAggregate]; !ok {
+ log.Warnf("No %s interface on cache server %s",
tc.CacheInterfacesAggregate, cacheName)
+ continue
+ }
cache, ok := crConfig.ContentServers[string(cacheName)]
if !ok {
log.Warnln("Getting delivery service capacity: delivery
service '" + string(ds) + "' cache '" + string(cacheName) + "' in CacheStats
but not CRConfig, skipping")
continue
}
+
if _, ok := cache.DeliveryServices[string(ds)]; !ok {
continue
}
if cache.ServerType == nil ||
!strings.HasPrefix(string(*cache.ServerType), string(tc.CacheTypeEdge)) {
continue
}
- if len(stats[StatNameKBPS]) < 1 || len(stats[StatNameMaxKBPS])
< 1 {
+
+ stat := interfaces[tc.CacheInterfacesAggregate]
+ if len(stat[StatNameKBPS]) < 1 || len(stat[StatNameMaxKBPS]) <
1 {
log.Warnln("Getting delivery service capacity: delivery
service '" + string(ds) + "' cache '" + string(cacheName) + "' CacheStats has
no kbps or maxKbps, skipping")
continue
}
- kbps, err := statToFloat(stats[StatNameKBPS][0].Val)
+ kbps, err := statToFloat(stat[StatNameKBPS][0].Val)
if err != nil {
log.Warnln("Getting delivery service capacity: delivery
service '" + string(ds) + "' cache '" + string(cacheName) + "' CacheStats kbps
is not a number, skipping")
continue
}
- maxKBPS, err := statToFloat(stats[StatNameMaxKBPS][0].Val)
+ maxKBPS, err := statToFloat(stat[StatNameMaxKBPS][0].Val)
if err != nil {
log.Warnln("Getting delivery service capacity: delivery
service '" + string(ds) + "' cache '" + string(cacheName) + "' CacheStats
maxKps is not a number, skipping")
continue
@@ -182,6 +189,7 @@ func addCapacity(cap CapData, ds tc.DeliveryServiceName,
cacheStats tmcache.Stat
continue // don't add capacity for OFFLINE or other
statuses
}
cap.Capacity += maxKBPS - thresholds[*cache.Profile]
+
}
return cap
}