dneuman64 closed pull request #3007: Change Monitor stat history to sync.Map
URL: https://github.com/apache/trafficcontrol/pull/3007
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/traffic_monitor/cache/astats_test.go 
b/traffic_monitor/cache/astats_test.go
index ed8869161..7cd3b5256 100644
--- a/traffic_monitor/cache/astats_test.go
+++ b/traffic_monitor/cache/astats_test.go
@@ -52,7 +52,6 @@ func getMockTODataDSNameDirectMatches() 
map[tc.DeliveryServiceName]string {
        }
 }
 
-// ds, ok := toData.DeliveryServiceRegexes.DeliveryService(domain, subdomain, 
subsubdomain)
 func getMockTOData(dsNameFQDNs map[tc.DeliveryServiceName]string) 
todata.TOData {
        tod := todata.New()
        for dsName, dsDirectMatch := range dsNameFQDNs {
diff --git a/traffic_monitor/cache/cache.go b/traffic_monitor/cache/cache.go
index 6e4f8c6f9..a6ec4a78f 100644
--- a/traffic_monitor/cache/cache.go
+++ b/traffic_monitor/cache/cache.go
@@ -20,10 +20,8 @@ package cache
  */
 
 import (
-       "encoding/json"
        "fmt"
        "io"
-       "net/url"
        "time"
 
        "github.com/apache/trafficcontrol/lib/go-log"
@@ -204,72 +202,6 @@ func ComputedStats() map[string]StatComputeFunc {
        }
 }
 
-// StatsMarshall encodes the stats in JSON, encoding up to historyCount of 
each stat. If statsToUse is empty, all stats are encoded; otherwise, only the 
given stats are encoded. If wildcard is true, stats which contain the text in 
each statsToUse are returned, instead of exact stat names. If cacheType is not 
CacheTypeInvalid, only stats for the given type are returned. If hosts is not 
empty, only the given hosts are returned.
-func StatsMarshall(statResultHistory ResultStatHistory, statInfo 
ResultInfoHistory, combinedStates tc.CRStates, monitorConfig 
tc.TrafficMonitorConfigMap, statMaxKbpses Kbpses, filter Filter, params 
url.Values) ([]byte, error) {
-       stats := Stats{
-               CommonAPIData: srvhttp.GetCommonAPIData(params, time.Now()),
-               Caches:        map[tc.CacheName]map[string][]ResultStatVal{},
-       }
-
-       computedStats := ComputedStats()
-
-       // TODO in 1.0, stats are divided into 'location', 'cache', and 'type'. 
'cache' are hidden by default.
-
-       for id, combinedStatesCache := range combinedStates.Caches {
-               if !filter.UseCache(id) {
-                       continue
-               }
-
-               for stat, vals := range statResultHistory[id] {
-                       stat = "ats." + stat // TM1 prefixes ATS stats with 
'ats.'
-                       if !filter.UseStat(stat) {
-                               continue
-                       }
-                       historyCount := 1
-                       for _, val := range vals {
-                               if !filter.WithinStatHistoryMax(historyCount) {
-                                       break
-                               }
-                               if _, ok := stats.Caches[id]; !ok {
-                                       stats.Caches[id] = 
map[string][]ResultStatVal{}
-                               }
-                               stats.Caches[id][stat] = 
append(stats.Caches[id][stat], val)
-                               historyCount += int(val.Span)
-                       }
-               }
-
-               serverInfo, ok := monitorConfig.TrafficServer[string(id)]
-               if !ok {
-                       log.Warnf("cache.StatsMarshall server %s missing from 
monitorConfig\n", id)
-               }
-
-               serverProfile, ok := monitorConfig.Profile[serverInfo.Profile]
-               if !ok {
-                       log.Warnf("cache.StatsMarshall server %s missing 
profile in monitorConfig\n", id)
-               }
-
-               for i, resultInfo := range statInfo[id] {
-                       if !filter.WithinStatHistoryMax(i + 1) {
-                               break
-                       }
-                       if _, ok := stats.Caches[id]; !ok {
-                               stats.Caches[id] = map[string][]ResultStatVal{}
-                       }
-
-                       t := resultInfo.Time
-
-                       for stat, statValF := range computedStats {
-                               if !filter.UseStat(stat) {
-                                       continue
-                               }
-                               stats.Caches[id][stat] = 
append(stats.Caches[id][stat], ResultStatVal{Val: statValF(resultInfo, 
serverInfo, serverProfile, combinedStatesCache), Time: t, Span: 1}) // 
combinedState will default to unavailable
-                       }
-               }
-       }
-
-       return json.Marshal(stats)
-}
-
 // Handle handles results fetched from a cache, parsing the raw Reader data 
and passing it along to a chan for further processing.
 func (handler Handler) Handle(id string, rdr io.Reader, format string, reqTime 
time.Duration, reqEnd time.Time, reqErr error, pollID uint64, pollFinished 
chan<- uint64) {
        log.Debugf("poll %v %v (format '%v') handle start\n", pollID, 
time.Now(), format)
diff --git a/traffic_monitor/cache/cache_test.go 
b/traffic_monitor/cache/cache_test.go
index 3ee0df51c..cdb02ab33 100644
--- a/traffic_monitor/cache/cache_test.go
+++ b/traffic_monitor/cache/cache_test.go
@@ -20,13 +20,9 @@ package cache
  */
 
 import (
-       "encoding/json"
-       "net/url"
        "testing"
-       "time"
 
        "github.com/apache/trafficcontrol/lib/go-tc"
-       "github.com/apache/trafficcontrol/traffic_monitor/srvhttp"
        "github.com/apache/trafficcontrol/traffic_monitor/todata"
 )
 
@@ -53,39 +49,3 @@ func (f DummyFilterNever) UseCache(name tc.CacheName) bool {
 func (f DummyFilterNever) WithinStatHistoryMax(i int) bool {
        return false
 }
-
-func TestStatsMarshall(t *testing.T) {
-       statHist := randResultStatHistory()
-       infHist := randResultInfoHistory()
-       filter := DummyFilterNever{}
-       params := url.Values{}
-       beforeStatsMarshall := time.Now()
-       bytes, err := StatsMarshall(statHist, infHist, tc.CRStates{}, 
tc.TrafficMonitorConfigMap{}, Kbpses{}, filter, params)
-       afterStatsMarshall := time.Now()
-       if err != nil {
-               t.Fatalf("StatsMarshall return expected nil err, actual err: 
%v", err)
-       }
-       // if len(bytes) > 0 {
-       //      t.Errorf("expected empty bytes, actual: %v", string(bytes))
-       // }
-
-       stats := Stats{}
-       if err := json.Unmarshal(bytes, &stats); err != nil {
-               t.Fatalf("unmarshalling expected nil err, actual err: %v", err)
-       }
-
-       if stats.CommonAPIData.QueryParams != "" {
-               t.Errorf(`unmarshalling stats.CommonAPIData.QueryParams 
expected "", actual %v`, stats.CommonAPIData.QueryParams)
-       }
-
-       statsDate, err := time.Parse(srvhttp.CommonAPIDataDateFormat, 
stats.CommonAPIData.DateStr)
-       if err != nil {
-               t.Errorf(`stats.CommonAPIData.DateStr expected format %v, 
actual %v`, srvhttp.CommonAPIDataDateFormat, stats.CommonAPIData.DateStr)
-       }
-       if beforeStatsMarshall.Truncate(time.Second).After(statsDate) || 
statsDate.Truncate(time.Second).After(afterStatsMarshall.Truncate(time.Second)) 
{ // round to second, because CommonAPIDataDateFormat is second-precision
-               t.Errorf(`unmarshalling stats.CommonAPIData.DateStr expected 
between %v and %v, actual %v`, beforeStatsMarshall, afterStatsMarshall, 
stats.CommonAPIData.DateStr)
-       }
-       if len(stats.Caches) > 0 {
-               t.Errorf(`unmarshalling stats.Caches expected empty, actual 
%+v`, stats.Caches)
-       }
-}
diff --git a/traffic_monitor/cache/data.go b/traffic_monitor/cache/data.go
index bb6daa5c1..c23dc970d 100644
--- a/traffic_monitor/cache/data.go
+++ b/traffic_monitor/cache/data.go
@@ -21,14 +21,13 @@ package cache
 
 import (
        "encoding/json"
-       "errors"
        "fmt"
        "time"
 
        "github.com/apache/trafficcontrol/lib/go-tc"
 )
 
-// CacheAvailableStatusReported is the status string returned by caches set to 
"reported" in Traffic Ops.
+// AvailableStatusReported is the status string returned by caches set to 
"reported" in Traffic Ops.
 // TODO put somewhere more generic
 const AvailableStatusReported = "REPORTED"
 
@@ -73,11 +72,6 @@ func (a ResultHistory) Copy() ResultHistory {
        return b
 }
 
-// ResultStatHistory is a map[cache][statName]val
-type ResultStatHistory map[tc.CacheName]ResultStatValHistory
-
-type ResultStatValHistory map[string][]ResultStatVal
-
 // ResultStatVal is the value of an individual stat returned from a poll. Time 
is the time this stat was returned.
 // Span is the number of polls this stat has been the same. For example, if 
History is set to 100, and the last 50 polls had the same value for this stat 
(but none of the previous 50 were the same), this stat's map value slice will 
actually contain 51 entries, and the first entry will have the value, the time 
of the last poll, and a Span of 50. Assuming the poll time is every 8 seconds, 
users will then know, looking at the Span, that the value was unchanged for the 
last 50*8=400 seconds.
 // JSON values are all strings, for the TM1.0 /publish/CacheStats API.
@@ -100,28 +94,6 @@ func (t *ResultStatVal) MarshalJSON() ([]byte, error) {
        return json.Marshal(&v)
 }
 
-func copyResultStatVals(a []ResultStatVal) []ResultStatVal {
-       b := make([]ResultStatVal, len(a), len(a))
-       copy(b, a)
-       return b
-}
-
-func copyResultStatValHistory(a ResultStatValHistory) ResultStatValHistory {
-       b := ResultStatValHistory{}
-       for k, v := range a {
-               b[k] = copyResultStatVals(v) // TODO determine if necessary
-       }
-       return b
-}
-
-func (a ResultStatHistory) Copy() ResultStatHistory {
-       b := ResultStatHistory{}
-       for k, v := range a {
-               b[k] = copyResultStatValHistory(v)
-       }
-       return b
-}
-
 func pruneStats(history []ResultStatVal, limit uint64) []ResultStatVal {
        if uint64(len(history)) > limit {
                history = history[:limit-1]
@@ -129,58 +101,6 @@ func pruneStats(history []ResultStatVal, limit uint64) 
[]ResultStatVal {
        return history
 }
 
-// newStatEqual Returns whether the given stat is equal to the latest stat in 
history. If len(history)==0, this returns false without error. If the given 
stat is not a JSON primitive (string, number, bool), this returns an error. We 
explicitly refuse to compare arrays and objects, for performance.
-func newStatEqual(history []ResultStatVal, stat interface{}) (bool, error) {
-       if len(history) == 0 {
-               return false, nil // if there's no history, it's "not equal", 
i.e. store this new history
-       }
-       switch stat.(type) {
-       case string:
-       case float64:
-       case bool:
-       default:
-               return false, fmt.Errorf("incomparable stat type %T", stat)
-       }
-       switch history[0].Val.(type) {
-       case string:
-       case float64:
-       case bool:
-       default:
-               return false, fmt.Errorf("incomparable history stat type %T", 
stat)
-       }
-       return stat == history[0].Val, nil
-}
-
-func (a ResultStatHistory) Add(r Result, limit uint64) error {
-       errStrs := ""
-       for statName, statVal := range r.Astats.Ats {
-               statHistory := a[r.ID][statName]
-               // If the new stat value is the same as the last, update the 
time and increment the span. Span is the number of polls the latest value has 
been the same, and hence the length of time it's been the same is 
span*pollInterval.
-               if ok, err := newStatEqual(statHistory, statVal); err != nil {
-                       errStrs += "cannot add stat " + statName + ": " + 
err.Error() + "; "
-               } else if ok {
-                       statHistory[0].Time = r.Time
-                       statHistory[0].Span++
-               } else {
-                       resultVal := ResultStatVal{
-                               Val:  statVal,
-                               Time: r.Time,
-                               Span: 1,
-                       }
-                       statHistory = 
pruneStats(append([]ResultStatVal{resultVal}, statHistory...), limit)
-               }
-               if _, ok := a[r.ID]; !ok {
-                       a[r.ID] = ResultStatValHistory{}
-               }
-               a[r.ID][statName] = statHistory // TODO determine if necessary 
for the first conditional
-       }
-
-       if errStrs != "" {
-               return errors.New("some stats could not be added: " + 
errStrs[:len(errStrs)-2])
-       }
-       return nil
-}
-
 // TODO determine if anything ever needs more than the latest, and if not, 
change ResultInfo to not be a slice.
 type ResultInfoHistory map[tc.CacheName][]ResultInfo
 
diff --git a/traffic_monitor/cache/data_test.go 
b/traffic_monitor/cache/data_test.go
index 794e4efcb..0521d423f 100644
--- a/traffic_monitor/cache/data_test.go
+++ b/traffic_monitor/cache/data_test.go
@@ -246,64 +246,6 @@ func randResultHistory() ResultHistory {
        return a
 }
 
-func randResultStatVal() ResultStatVal {
-       return ResultStatVal{
-               Val:  uint64(rand.Int63()),
-               Time: time.Now(),
-               Span: uint64(rand.Int63()),
-       }
-}
-
-func randResultStatValHistory() ResultStatValHistory {
-       a := ResultStatValHistory{}
-       num := 5
-       numSlice := 5
-       for i := 0; i < num; i++ {
-               cacheName := randStr()
-               for j := 0; j < numSlice; j++ {
-                       a[cacheName] = append(a[cacheName], randResultStatVal())
-               }
-       }
-       return a
-}
-
-func randResultStatHistory() ResultStatHistory {
-       hist := ResultStatHistory{}
-
-       num := 5
-       for i := 0; i < num; i++ {
-               hist[tc.CacheName(randStr())] = randResultStatValHistory()
-       }
-       return hist
-}
-
-func randResultInfoHistory() ResultInfoHistory {
-       // type ResultInfoHistory map[tc.CacheName][]ResultInfo
-       hist := ResultInfoHistory{}
-
-       num := 5
-       infNum := 5
-       for i := 0; i < num; i++ {
-               cacheName := tc.CacheName(randStr())
-               for j := 0; j < infNum; j++ {
-                       hist[cacheName] = append(hist[cacheName], 
randResultInfo())
-               }
-       }
-       return hist
-}
-
-func randResultInfo() ResultInfo {
-       return ResultInfo{
-               ID:          tc.CacheName(randStr()),
-               Error:       fmt.Errorf(randStr()),
-               Time:        time.Now(),
-               RequestTime: time.Millisecond * time.Duration(rand.Int()),
-               Vitals:      randVitals(),
-               PollID:      uint64(rand.Int63()),
-               Available:   randBool(),
-       }
-}
-
 func TestResultHistoryCopy(t *testing.T) {
        num := 5
        for i := 0; i < num; i++ {
diff --git a/traffic_monitor/datareq/cachestat.go 
b/traffic_monitor/datareq/cachestat.go
index 32a93d075..16ee02eaf 100644
--- a/traffic_monitor/datareq/cachestat.go
+++ b/traffic_monitor/datareq/cachestat.go
@@ -23,7 +23,6 @@ import (
        "net/http"
        "net/url"
 
-       "github.com/apache/trafficcontrol/traffic_monitor/cache"
        "github.com/apache/trafficcontrol/traffic_monitor/peer"
        "github.com/apache/trafficcontrol/traffic_monitor/threadsafe"
        "github.com/apache/trafficcontrol/traffic_monitor/todata"
@@ -35,6 +34,6 @@ func srvCacheStats(params url.Values, errorCount 
threadsafe.Uint, path string, t
                HandleErr(errorCount, path, err)
                return []byte(err.Error()), http.StatusBadRequest
        }
-       bytes, err := cache.StatsMarshall(statResultHistory.Get(), 
statInfoHistory.Get(), combinedStates.Get(), monitorConfig.Get(), 
statMaxKbpses.Get(), filter, params)
+       bytes, err := threadsafe.StatsMarshall(statResultHistory, 
statInfoHistory.Get(), combinedStates.Get(), monitorConfig.Get(), 
statMaxKbpses.Get(), filter, params)
        return WrapErrCode(errorCount, path, bytes, err)
 }
diff --git a/traffic_monitor/datareq/cachestate.go 
b/traffic_monitor/datareq/cachestate.go
index 5c1c9f3d8..0f87f6cc0 100644
--- a/traffic_monitor/datareq/cachestate.go
+++ b/traffic_monitor/datareq/cachestate.go
@@ -68,13 +68,13 @@ func srvAPICacheStates(
        statMaxKbpses threadsafe.CacheKbpses,
        monitorConfig threadsafe.TrafficMonitorConfigMap,
 ) ([]byte, error) {
-       return json.Marshal(createCacheStatuses(toData.Get().ServerTypes, 
statInfoHistory.Get(), statResultHistory.Get(), healthHistory.Get(), 
lastHealthDurations.Get(), localStates.Get().Caches, lastStats.Get(), 
localCacheStatus, statMaxKbpses, monitorConfig.Get().TrafficServer))
+       return json.Marshal(createCacheStatuses(toData.Get().ServerTypes, 
statInfoHistory.Get(), statResultHistory, healthHistory.Get(), 
lastHealthDurations.Get(), localStates.Get().Caches, lastStats.Get(), 
localCacheStatus, statMaxKbpses, monitorConfig.Get().TrafficServer))
 }
 
 func createCacheStatuses(
        cacheTypes map[tc.CacheName]tc.CacheType,
        statInfoHistory cache.ResultInfoHistory,
-       statResultHistory cache.ResultStatHistory,
+       statResultHistory threadsafe.ResultStatHistory,
        healthHistory map[tc.CacheName][]cache.Result,
        lastHealthDurations map[tc.CacheName]time.Duration,
        cacheStates map[tc.CacheName]tc.IsAvailable,
@@ -200,20 +200,22 @@ func cacheStatusAndPoller(server tc.CacheName, serverInfo 
tc.TrafficServer, loca
        return fmt.Sprintf("%s - unavailable", statusVal.Status), 
statusVal.Poller
 }
 
-func createCacheConnections(statResultHistory cache.ResultStatHistory) 
map[tc.CacheName]int64 {
+func createCacheConnections(statResultHistory threadsafe.ResultStatHistory) 
map[tc.CacheName]int64 {
        conns := map[tc.CacheName]int64{}
-       for server, history := range statResultHistory {
-               vals, ok := 
history["proxy.process.http.current_client_connections"]
-               if !ok || len(vals) < 1 {
-                       continue
+       statResultHistory.Range(func(server tc.CacheName, history 
threadsafe.ResultStatValHistory) bool {
+               // for server, history := range statResultHistory {
+               vals := 
history.Load("proxy.process.http.current_client_connections")
+               if len(vals) == 0 {
+                       return true
                }
 
                v, ok := vals[0].Val.(float64)
                if !ok {
-                       continue // TODO log warning? error?
+                       return true // TODO log warning? error?
                }
                conns[server] = int64(v)
-       }
+               return true
+       })
        return conns
 }
 
diff --git a/traffic_monitor/datareq/statsummary.go 
b/traffic_monitor/datareq/statsummary.go
index c06323fc6..85822ff5f 100644
--- a/traffic_monitor/datareq/statsummary.go
+++ b/traffic_monitor/datareq/statsummary.go
@@ -56,27 +56,28 @@ func srvStatSummary(params url.Values, errorCount 
threadsafe.Uint, path string,
                HandleErr(errorCount, path, err)
                return []byte(err.Error()), http.StatusBadRequest
        }
-       bytes, err := json.Marshal(createStatSummary(statResultHistory.Get(), 
filter, params))
+       bytes, err := json.Marshal(createStatSummary(statResultHistory, filter, 
params))
        return WrapErrCode(errorCount, path, bytes, err)
 }
 
-func createStatSummary(statResultHistory cache.ResultStatHistory, filter 
cache.Filter, params url.Values) StatSummary {
+func createStatSummary(statResultHistory threadsafe.ResultStatHistory, filter 
cache.Filter, params url.Values) StatSummary {
        statPrefix := "ats."
        ss := StatSummary{
                Caches:        map[tc.CacheName]map[string]StatSummaryStat{},
                CommonAPIData: srvhttp.GetCommonAPIData(params, time.Now()),
        }
-       for cache, stats := range statResultHistory {
-               if !filter.UseCache(cache) {
-                       continue
+
+       statResultHistory.Range(func(cacheName tc.CacheName, stats 
threadsafe.ResultStatValHistory) bool {
+               if !filter.UseCache(cacheName) {
+                       return true
                }
                ssStats := map[string]StatSummaryStat{}
-               for statName, statHistory := range stats {
+               stats.Range(func(statName string, statHistory 
[]cache.ResultStatVal) bool {
                        if !filter.UseStat(statName) {
-                               continue
+                               return true
                        }
                        if len(statHistory) == 0 {
-                               continue
+                               return true
                        }
                        ssStat := StatSummaryStat{}
                        msPerNs := int64(1000000)
@@ -85,7 +86,7 @@ func createStatSummary(statResultHistory 
cache.ResultStatHistory, filter cache.F
                        oldestVal, isOldestValNumeric := 
util.ToNumeric(statHistory[len(statHistory)-1].Val)
                        newestVal, isNewestValNumeric := 
util.ToNumeric(statHistory[0].Val)
                        if !isOldestValNumeric || !isNewestValNumeric {
-                               continue // skip non-numeric stats
+                               return true // skip non-numeric stats
                        }
                        ssStat.Start = oldestVal
                        ssStat.End = newestVal
@@ -95,7 +96,7 @@ func createStatSummary(statResultHistory 
cache.ResultStatHistory, filter cache.F
                                fVal, ok := util.ToNumeric(val.Val)
                                if !ok {
                                        log.Warnf("threshold stat %v value %v 
is not a number, cannot use.", statName, val.Val)
-                                       continue
+                                       return true
                                }
                                for i := uint64(0); i < val.Span; i++ {
                                        ssStat.DataPointCount++
@@ -110,8 +111,10 @@ func createStatSummary(statResultHistory 
cache.ResultStatHistory, filter cache.F
                                }
                        }
                        ssStats[statPrefix+statName] = ssStat
-               }
-               ss.Caches[cache] = ssStats
-       }
+                       return true
+               })
+               ss.Caches[cacheName] = ssStats
+               return true
+       })
        return ss
 }
diff --git a/traffic_monitor/ds/stat_test.go b/traffic_monitor/ds/stat_test.go
new file mode 100644
index 000000000..ba78db129
--- /dev/null
+++ b/traffic_monitor/ds/stat_test.go
@@ -0,0 +1,396 @@
+package ds
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import (
+       "errors"
+       "fmt"
+       "math"
+       "math/rand"
+       "testing"
+       "time"
+
+       "github.com/apache/trafficcontrol/lib/go-tc"
+       "github.com/apache/trafficcontrol/traffic_monitor/cache"
+       "github.com/apache/trafficcontrol/traffic_monitor/dsdata"
+       "github.com/apache/trafficcontrol/traffic_monitor/health"
+       "github.com/apache/trafficcontrol/traffic_monitor/peer"
+       "github.com/apache/trafficcontrol/traffic_monitor/threadsafe"
+       "github.com/apache/trafficcontrol/traffic_monitor/todata"
+)
+
+func TestCreateStats(t *testing.T) {
+       toData := getMockTOData()
+       combinedCRStates := peer.NewCRStatesThreadsafe()
+       lastStatsThs := threadsafe.NewLastStats()
+       now := time.Now()
+       maxEvents := uint64(4)
+       events := health.NewThreadsafeEvents(maxEvents)
+       localCRStates := peer.NewCRStatesThreadsafe()
+
+       dses := []tc.DeliveryServiceName{}
+       for ds, _ := range toData.DeliveryServiceServers {
+               dses = append(dses, ds)
+       }
+
+       caches := []tc.CacheName{}
+       for cache, _ := range toData.ServerDeliveryServices {
+               caches = append(caches, cache)
+       }
+
+       for _, cache := range caches {
+               combinedCRStates.AddCache(cache, tc.IsAvailable{IsAvailable: 
true})
+               localCRStates.AddCache(cache, tc.IsAvailable{IsAvailable: true})
+       }
+
+       precomputeds := randCachesPrecomputedData(caches, toData)
+
+       monitorConfig := getMockMonitorConfig(dses)
+
+       dsStats, lastStats, err := CreateStats(precomputeds, toData, 
combinedCRStates.Get(), lastStatsThs.Get().Copy(), now, monitorConfig, events, 
localCRStates)
+
+       if err != nil {
+               t.Fatalf("CreateStats err expected: nil, actual: " + 
err.Error())
+       }
+
+       cgMap := map[tc.CacheGroupName]struct{}{}
+       for _, cg := range toData.ServerCachegroups {
+               cgMap[cg] = struct{}{}
+       }
+
+       tpMap := map[tc.CacheType]struct{}{}
+       for _, tp := range toData.ServerTypes {
+               tpMap[tp] = struct{}{}
+       }
+
+       caMap := map[tc.CacheName]struct{}{}
+       for ca, _ := range toData.ServerDeliveryServices {
+               caMap[ca] = struct{}{}
+       }
+
+       for dsName, dsStat := range dsStats.DeliveryService {
+               for cgName, cgStat := range dsStat.CacheGroups {
+                       if _, ok := cgMap[cgName]; !ok {
+                               t.Fatalf("CreateStats cachegroup expected: %+v, 
actual: %+v", cgMap, cgName)
+                       }
+
+                       cgExpected := cache.AStat{}
+                       for pCache, pData := range precomputeds {
+                               if toData.ServerCachegroups[pCache] != cgName {
+                                       continue
+                               }
+
+                               if pDataDS, ok := 
pData.DeliveryServiceStats[dsName]; ok {
+                                       cgExpected.InBytes += pDataDS.InBytes
+                                       cgExpected.OutBytes += pDataDS.OutBytes
+                                       cgExpected.Status2xx += 
pDataDS.Status2xx
+                                       cgExpected.Status3xx += 
pDataDS.Status3xx
+                                       cgExpected.Status4xx += 
pDataDS.Status4xx
+                                       cgExpected.Status5xx += 
pDataDS.Status5xx
+                               }
+                       }
+
+                       if errStr := compareAStatToStatCacheStats(&cgExpected, 
&cgStat); errStr != "" {
+                               t.Fatalf("CreateStats cachegroup " + 
string(cgName) + ": " + errStr)
+                       }
+
+               }
+
+               for tpName, tpStat := range dsStat.Types {
+                       if _, ok := tpMap[tpName]; !ok {
+                               t.Fatalf("CreateStats type expected: %+v, 
actual: %+v", tpMap, tpName)
+                       }
+
+                       tpExpected := cache.AStat{}
+                       for pCache, pData := range precomputeds {
+                               if toData.ServerTypes[pCache] != tpName {
+                                       continue
+                               }
+
+                               if pDataDS, ok := 
pData.DeliveryServiceStats[dsName]; ok {
+                                       tpExpected.InBytes += pDataDS.InBytes
+                                       tpExpected.OutBytes += pDataDS.OutBytes
+                                       tpExpected.Status2xx += 
pDataDS.Status2xx
+                                       tpExpected.Status3xx += 
pDataDS.Status3xx
+                                       tpExpected.Status4xx += 
pDataDS.Status4xx
+                                       tpExpected.Status5xx += 
pDataDS.Status5xx
+                               }
+                       }
+
+                       if errStr := compareAStatToStatCacheStats(&tpExpected, 
&tpStat); errStr != "" {
+                               t.Fatalf("CreateStats type " + string(tpName) + 
": " + errStr)
+                       }
+               }
+
+               for caName, caStat := range dsStat.Caches {
+                       if _, ok := caMap[caName]; !ok {
+                               t.Fatalf("CreateStats cache expected: %+v, 
actual: %+v", caMap, caName)
+                       }
+
+                       caExpected := cache.AStat{}
+                       for pCache, pData := range precomputeds {
+                               if pCache != caName {
+                                       continue
+                               }
+
+                               if pDataDS, ok := 
pData.DeliveryServiceStats[dsName]; ok {
+                                       caExpected.InBytes += pDataDS.InBytes
+                                       caExpected.OutBytes += pDataDS.OutBytes
+                                       caExpected.Status2xx += 
pDataDS.Status2xx
+                                       caExpected.Status3xx += 
pDataDS.Status3xx
+                                       caExpected.Status4xx += 
pDataDS.Status4xx
+                                       caExpected.Status5xx += 
pDataDS.Status5xx
+                               }
+                       }
+
+                       if errStr := compareAStatToStatCacheStats(&caExpected, 
&caStat); errStr != "" {
+                               t.Fatalf("CreateStats cache " + string(caName) 
+ ": " + errStr)
+                       }
+               }
+
+               {
+                       cmStat := dsStat.CommonStats
+
+                       if int(cmStat.CachesConfiguredNum.Value) != 
len(toData.DeliveryServiceServers[dsName]) {
+                               t.Fatalf("CreateStats 
CommonStats.CachesConfiguredNum expected: %+v actual: %+v", 
len(toData.DeliveryServiceServers[dsName]), 
dsStat.CommonStats.CachesConfiguredNum.Value)
+                       }
+
+                       for caName, reporting := range cmStat.CachesReporting {
+                               if _, ok := caMap[caName]; !ok {
+                                       t.Fatalf("CreateStats 
CommonStats.CachesReporting '%+v' not in test caches", caName)
+                               }
+                               if !reporting {
+                                       t.Fatalf("CreateStats 
len(CommonStats.CachesReporting[%+v] expected: true actual: false", caName)
+                               }
+                       }
+
+                       if cmStat.ErrorStr.Value != "" {
+                               t.Fatalf("CreateStats CommonStats.ErrorStr 
expected: '' actual: %+v", cmStat.ErrorStr.Value)
+                       }
+
+                       if cmStat.StatusStr.Value != "" {
+                               t.Fatalf("CreateStats CommonStats.StatusStr 
expected: '' actual: '%+v'", cmStat.StatusStr.Value)
+                       }
+               }
+       }
+
+       if len(lastStats.DeliveryServices) != 
len(toData.DeliveryServiceServers) {
+               t.Fatalf("CreateStats len(LastStats.DeliveryServices) expected: 
%+v actual: %+v", len(toData.DeliveryServiceServers), 
len(lastStats.DeliveryServices))
+       }
+
+       if len(lastStats.Caches) != len(toData.ServerDeliveryServices) {
+               t.Fatalf("CreateStats len(LastStats.Caches) expected: %+v 
actual: %+v", len(toData.ServerDeliveryServices), len(lastStats.Caches))
+       }
+
+}
+
+// compareAStatToStatCacheStats compares the two stats, and returns an error 
string, which is empty of both are equal.
+// The fields in StatCacheStats but not AStat are ignored.
+func compareAStatToStatCacheStats(expected *cache.AStat, actual 
*dsdata.StatCacheStats) string {
+       if actual.InBytes.Value != float64(expected.InBytes) {
+               return fmt.Sprintf("InBytes expected: \n%+v, actual: \n%+v", 
expected.InBytes, actual.InBytes.Value)
+       }
+
+       if actual.OutBytes.Value != int64(expected.OutBytes) {
+               return fmt.Sprintf("OutBytes expected: \n%+v, actual: \n%+v", 
expected.OutBytes, actual.OutBytes.Value)
+       }
+
+       if actual.Status2xx.Value != int64(expected.Status2xx) {
+               return fmt.Sprintf("Status2xx expected: \n%+v, actual: \n%+v", 
expected.Status2xx, actual.Status2xx.Value)
+       }
+
+       if actual.Status3xx.Value != int64(expected.Status3xx) {
+               return fmt.Sprintf("Status3xx expected: \n%+v, actual: \n%+v", 
expected.Status3xx, actual.Status3xx.Value)
+       }
+
+       if actual.Status4xx.Value != int64(expected.Status4xx) {
+               return fmt.Sprintf("Status4xx expected: \n%+v, actual: \n%+v", 
expected.Status4xx, actual.Status4xx.Value)
+       }
+
+       if actual.Status5xx.Value != int64(expected.Status5xx) {
+               return fmt.Sprintf("Status5xx expected: \n%+v, actual: \n%+v", 
expected.Status5xx, actual.Status5xx.Value)
+       }
+
+       if actual.ErrorString.Value != "" {
+               return fmt.Sprintf("ErrorString expected: empty, actual: %+v", 
actual.ErrorString.Value)
+       }
+
+       return ""
+}
+
+func getMockMonitorDSNoThresholds(name tc.DeliveryServiceName) 
tc.TMDeliveryService {
+       return tc.TMDeliveryService{
+               XMLID:              string(name),
+               TotalTPSThreshold:  math.MaxInt64,
+               ServerStatus:       string(tc.CacheStatusReported),
+               TotalKbpsThreshold: math.MaxInt64,
+       }
+}
+
+func getMockMonitorDSLowThresholds(name tc.DeliveryServiceName) 
tc.TMDeliveryService {
+       return tc.TMDeliveryService{
+               XMLID:              string(name),
+               TotalTPSThreshold:  1,
+               ServerStatus:       string(tc.CacheStatusReported),
+               TotalKbpsThreshold: 1,
+       }
+}
+
+func getMockMonitorConfig(dses []tc.DeliveryServiceName) 
tc.TrafficMonitorConfigMap {
+       mc := tc.TrafficMonitorConfigMap{
+               TrafficServer:   map[string]tc.TrafficServer{},
+               CacheGroup:      map[string]tc.TMCacheGroup{},
+               Config:          map[string]interface{}{},
+               TrafficMonitor:  map[string]tc.TrafficMonitor{},
+               DeliveryService: map[string]tc.TMDeliveryService{},
+               Profile:         map[string]tc.TMProfile{},
+       }
+
+       tmDSes := map[string]tc.TMDeliveryService{}
+       for _, ds := range dses {
+               tmDSes[string(ds)] = getMockMonitorDSNoThresholds(ds)
+       }
+       mc.DeliveryService = tmDSes
+
+       return mc
+}
+
+func getMockTOData() todata.TOData {
+       numCaches := 100
+       numDSes := 100
+       numCacheDSes := numDSes / 3
+       numCGs := 20
+
+       types := []tc.CacheType{tc.CacheTypeEdge, tc.CacheTypeEdge, 
tc.CacheTypeEdge, tc.CacheTypeEdge, tc.CacheTypeEdge, tc.CacheTypeMid}
+
+       caches := []tc.CacheName{}
+       for i := 0; i < numCaches; i++ {
+               caches = append(caches, tc.CacheName(randStr()))
+       }
+
+       dses := []tc.DeliveryServiceName{}
+       for i := 0; i < numDSes; i++ {
+               dses = append(dses, tc.DeliveryServiceName(randStr()))
+       }
+
+       cgs := []tc.CacheGroupName{}
+       for i := 0; i < numCGs; i++ {
+               cgs = append(cgs, tc.CacheGroupName(randStr()))
+       }
+
+       serverDSes := map[tc.CacheName][]tc.DeliveryServiceName{}
+       for _, ca := range caches {
+               for i := 0; i < numCacheDSes; i++ {
+                       serverDSes[ca] = append(serverDSes[ca], 
dses[rand.Intn(len(dses))])
+               }
+       }
+
+       dsServers := map[tc.DeliveryServiceName][]tc.CacheName{}
+       for server, dses := range serverDSes {
+               for _, ds := range dses {
+                       dsServers[ds] = append(dsServers[ds], server)
+               }
+       }
+
+       serverCGs := map[tc.CacheName]tc.CacheGroupName{}
+       for _, cache := range caches {
+               serverCGs[cache] = cgs[rand.Intn(len(cgs))]
+       }
+
+       serverTypes := map[tc.CacheName]tc.CacheType{}
+       for _, cache := range caches {
+               serverTypes[cache] = types[rand.Intn(len(types))]
+       }
+
+       tod := todata.New()
+       tod.DeliveryServiceServers = dsServers
+       tod.ServerDeliveryServices = serverDSes
+       tod.ServerTypes = serverTypes
+       tod.ServerCachegroups = serverCGs
+       return *tod
+}
+
+func randCachesPrecomputedData(caches []tc.CacheName, toData todata.TOData) 
map[tc.CacheName]cache.PrecomputedData {
+       prc := map[tc.CacheName]cache.PrecomputedData{}
+       for _, ca := range caches {
+               prc[ca] = randPrecomputedData(toData)
+       }
+       return prc
+}
+
+func randPrecomputedData(toData todata.TOData) cache.PrecomputedData {
+       dsStats := randDsStats(toData)
+       dsTotal := uint64(0)
+       for _, stat := range dsStats {
+               dsTotal += stat.OutBytes
+       }
+       return cache.PrecomputedData{
+               DeliveryServiceStats: dsStats,
+               OutBytes:             int64(dsTotal),
+               MaxKbps:              rand.Int63(),
+               Errors:               randErrs(),
+               Reporting:            true,
+       }
+}
+
+func randDsStats(toData todata.TOData) map[tc.DeliveryServiceName]*cache.AStat 
{
+       a := map[tc.DeliveryServiceName]*cache.AStat{}
+       for ds, _ := range toData.DeliveryServiceServers {
+               a[ds] = randAStat()
+       }
+       return a
+}
+
+func randAStat() *cache.AStat {
+       return &cache.AStat{
+               InBytes:   uint64(rand.Intn(1000)),
+               OutBytes:  uint64(rand.Intn(1000)),
+               Status2xx: uint64(rand.Intn(1000)),
+               Status3xx: uint64(rand.Intn(1000)),
+               Status4xx: uint64(rand.Intn(1000)),
+               Status5xx: uint64(rand.Intn(1000)),
+       }
+}
+
+func randErrs() []error {
+       if randBool() {
+               return []error{}
+       }
+       num := 5
+       errs := []error{}
+       for i := 0; i < num; i++ {
+               errs = append(errs, errors.New(randStr()))
+       }
+       return errs
+}
+
+func randBool() bool {
+       return rand.Int()%2 == 0
+}
+
+func randStr() string {
+       chars := 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890-_"
+       num := 100
+       s := ""
+       for i := 0; i < num; i++ {
+               s += string(chars[rand.Intn(len(chars))])
+       }
+       return s
+}
diff --git a/traffic_monitor/health/cache.go b/traffic_monitor/health/cache.go
index bf1fba74b..1b34ac9bd 100644
--- a/traffic_monitor/health/cache.go
+++ b/traffic_monitor/health/cache.go
@@ -95,28 +95,21 @@ func GetVitals(newResult *cache.Result, prevResult 
*cache.Result, mc *tc.Traffic
 // EvalCache returns whether the given cache should be marked available, a 
string describing why, and which stat exceeded a threshold. The `stats` may be 
nil, for pollers which don't poll stats.
 // The availability of EvalCache MAY NOT be used to directly set the cache's 
local availability, because the threshold stats may not be part of the poller 
which produced the result. Rather, if the cache was previously unavailable from 
a threshold, it must be verified that threshold stat is in the results before 
setting the cache to available.
 // TODO change to return a `cache.AvailableStatus`
-func EvalCache(result cache.ResultInfo, resultStats 
cache.ResultStatValHistory, mc *tc.TrafficMonitorConfigMap) (bool, string, 
string) {
+func EvalCache(result cache.ResultInfo, mc *tc.TrafficMonitorConfigMap) (bool, 
string, string) {
        serverInfo, ok := mc.TrafficServer[string(result.ID)]
        if !ok {
                log.Errorf("Cache %v missing from from Traffic Ops Monitor 
Config - treating as OFFLINE\n", result.ID)
                return false, "ERROR - server missing in Traffic Ops monitor 
config", ""
        }
-       serverProfile, ok := mc.Profile[serverInfo.Profile]
-       if !ok {
-               log.Errorf("Cache %v profile %v missing from from Traffic Ops 
Monitor Config - treating as OFFLINE\n", result.ID, serverInfo.Profile)
-               return false, "ERROR - server profile missing in Traffic Ops 
monitor config", ""
-       }
-
-       status := tc.CacheStatusFromString(serverInfo.ServerStatus)
-       if status == tc.CacheStatusInvalid {
-               log.Errorf("Cache %v got invalid status from Traffic Ops '%v' - 
treating as Reported\n", result.ID, serverInfo.ServerStatus)
-       }
+       serverStatus := tc.CacheStatusFromString(serverInfo.ServerStatus)
+       return EvalCacheWithStatusInfo(result, mc, serverStatus, serverInfo)
+}
 
-       availability := "available"
+func EvalCacheWithStatusInfo(result cache.ResultInfo, mc 
*tc.TrafficMonitorConfigMap, status tc.CacheStatus, serverInfo 
tc.TrafficServer) (bool, string, string) {
+       availability := AvailableStr
        if !result.Available {
-               availability = "unavailable"
+               availability = UnavailableStr
        }
-
        switch {
        case status == tc.CacheStatusInvalid:
                log.Errorf("Cache %v got invalid status from Traffic Ops '%v' - 
treating as OFFLINE\n", result.ID, serverInfo.ServerStatus)
@@ -133,6 +126,37 @@ func EvalCache(result cache.ResultInfo, resultStats 
cache.ResultStatValHistory,
        case result.System.NotAvailable == true:
                return false, eventDesc(status, 
fmt.Sprintf("system.notAvailable == %v", result.System.NotAvailable)), ""
        }
+       return result.Available, eventDesc(status, availability), ""
+}
+
+const AvailableStr = "available"
+const UnavailableStr = "unavailable"
+
+// EvalCache returns whether the given cache should be marked available, a 
string describing why, and which stat exceeded a threshold. The `stats` may be 
nil, for pollers which don't poll stats.
+// The availability of EvalCache MAY NOT be used to directly set the cache's 
local availability, because the threshold stats may not be part of the poller 
which produced the result. Rather, if the cache was previously unavailable from 
a threshold, it must be verified that threshold stat is in the results before 
setting the cache to available.
+// TODO change to return a `cache.AvailableStatus`
+func EvalCacheWithStats(result cache.ResultInfo, resultStats 
threadsafe.ResultStatValHistory, mc *tc.TrafficMonitorConfigMap) (bool, string, 
string) {
+       serverInfo, ok := mc.TrafficServer[string(result.ID)]
+       if !ok {
+               log.Errorf("Cache %v missing from from Traffic Ops Monitor 
Config - treating as OFFLINE\n", result.ID)
+               return false, "ERROR - server missing in Traffic Ops monitor 
config", ""
+       }
+       status := tc.CacheStatusFromString(serverInfo.ServerStatus)
+       if status == tc.CacheStatusOnline {
+               // return here first, even though EvalCacheWithStatus checks 
online, because we later assume that if EvalCacheWithStatus returns true, to 
return false if thresholds are exceeded; but, if the cache is ONLINE, we don't 
want to check thresholds.
+               return true, eventDesc(status, AvailableStr), ""
+       }
+
+       serverProfile, ok := mc.Profile[serverInfo.Profile]
+       if !ok {
+               log.Errorf("Cache %v profile %v missing from from Traffic Ops 
Monitor Config - treating as OFFLINE\n", result.ID, serverInfo.Profile)
+               return false, "ERROR - server profile missing in Traffic Ops 
monitor config", ""
+       }
+
+       avail, eventDescVal, eventMsg := EvalCacheWithStatusInfo(result, mc, 
status, serverInfo)
+       if !avail {
+               return avail, eventDescVal, eventMsg
+       }
 
        computedStats := cache.ComputedStats()
 
@@ -142,14 +166,8 @@ func EvalCache(result cache.ResultInfo, resultStats 
cache.ResultStatValHistory,
                        dummyCombinedstate := tc.IsAvailable{} // the only 
stats which use combinedState are things like isAvailable, which don't make 
sense to ever be thresholds.
                        resultStat = computedStatF(result, serverInfo, 
serverProfile, dummyCombinedstate)
                } else {
-                       if resultStats == nil {
-                               continue
-                       }
-                       resultStatHistory, ok := resultStats[stat]
-                       if !ok {
-                               continue
-                       }
-                       if len(resultStatHistory) < 1 {
+                       resultStatHistory := resultStats.Load(stat)
+                       if len(resultStatHistory) == 0 {
                                continue
                        }
                        resultStat = resultStatHistory[0].Val
@@ -166,20 +184,46 @@ func EvalCache(result cache.ResultInfo, resultStats 
cache.ResultStatValHistory,
                }
        }
 
-       return result.Available, eventDesc(status, availability), ""
+       return avail, eventDescVal, eventMsg
 }
 
-// CalcAvailability calculates the availability of the cache, from the given 
result. Availability is stored in `localCacheStatus` and `localStates`, and if 
the status changed an event is added to `events`. statResultHistory may be nil, 
for pollers which don't poll stats.
-// TODO add tc for poller names?
-func CalcAvailability(results []cache.Result, pollerName string, 
statResultHistory cache.ResultStatHistory, mc tc.TrafficMonitorConfigMap, 
toData todata.TOData, localCacheStatusThreadsafe 
threadsafe.CacheAvailableStatus, localStates peer.CRStatesThreadsafe, events 
ThreadsafeEvents) {
+func CalcAvailabilityWithStats(results []cache.Result, pollerName string, 
statResultHistory threadsafe.ResultStatHistory, mc tc.TrafficMonitorConfigMap, 
toData todata.TOData, localCacheStatusThreadsafe 
threadsafe.CacheAvailableStatus, localStates peer.CRStatesThreadsafe, events 
ThreadsafeEvents) {
        localCacheStatuses := localCacheStatusThreadsafe.Get().Copy()
        for _, result := range results {
-               statResults := cache.ResultStatValHistory(nil)
-               if statResultHistory != nil {
-                       statResults = statResultHistory[result.ID]
+               statResults := statResultHistory.LoadOrStore(result.ID)
+               isAvailable, whyAvailable, unavailableStat := 
EvalCacheWithStats(cache.ToInfo(result), statResults, &mc)
+
+               // if the cache is now Available, and was previously 
unavailable due to a threshold, make sure this poller contains the stat which 
exceeded the threshold.
+               if previousStatus, hasPreviousStatus := 
localCacheStatuses[result.ID]; isAvailable && hasPreviousStatus && 
!previousStatus.Available && previousStatus.UnavailableStat != "" {
+                       if !result.HasStat(previousStatus.UnavailableStat) {
+                               return
+                       }
                }
+               localCacheStatuses[result.ID] = cache.AvailableStatus{
+                       Available:       isAvailable,
+                       Status:          
mc.TrafficServer[string(result.ID)].ServerStatus,
+                       Why:             whyAvailable,
+                       UnavailableStat: unavailableStat,
+                       Poller:          pollerName,
+               } // TODO move within localStates?
+
+               if available, ok := localStates.GetCache(result.ID); !ok || 
available.IsAvailable != isAvailable {
+                       log.Infof("Changing state for %s was: %t now: %t 
because %s poller: %v error: %v", result.ID, available.IsAvailable, 
isAvailable, whyAvailable, pollerName, result.Error)
+                       events.Add(Event{Time: Time(time.Now()), Description: 
whyAvailable + " (" + pollerName + ")", Name: string(result.ID), Hostname: 
string(result.ID), Type: toData.ServerTypes[result.ID].String(), Available: 
isAvailable})
+               }
+
+               localStates.SetCache(result.ID, tc.IsAvailable{IsAvailable: 
isAvailable})
+       }
+       calculateDeliveryServiceState(toData.DeliveryServiceServers, 
localStates, toData)
+       localCacheStatusThreadsafe.Set(localCacheStatuses)
+}
 
-               isAvailable, whyAvailable, unavailableStat := 
EvalCache(cache.ToInfo(result), statResults, &mc)
+// CalcAvailability calculates the availability of the cache, from the given 
result. Availability is stored in `localCacheStatus` and `localStates`, and if 
the status changed an event is added to `events`. statResultHistory may be nil, 
for pollers which don't poll stats.
+// TODO add tc for poller names?
+func CalcAvailability(results []cache.Result, pollerName string, mc 
tc.TrafficMonitorConfigMap, toData todata.TOData, localCacheStatusThreadsafe 
threadsafe.CacheAvailableStatus, localStates peer.CRStatesThreadsafe, events 
ThreadsafeEvents) {
+       localCacheStatuses := localCacheStatusThreadsafe.Get().Copy()
+       for _, result := range results {
+               isAvailable, whyAvailable, unavailableStat := 
EvalCache(cache.ToInfo(result), &mc)
 
                // if the cache is now Available, and was previously 
unavailable due to a threshold, make sure this poller contains the stat which 
exceeded the threshold.
                if previousStatus, hasPreviousStatus := 
localCacheStatuses[result.ID]; isAvailable && hasPreviousStatus && 
!previousStatus.Available && previousStatus.UnavailableStat != "" {
diff --git a/traffic_monitor/manager/health.go 
b/traffic_monitor/manager/health.go
index fa47fba98..6fad8aae1 100644
--- a/traffic_monitor/manager/health.go
+++ b/traffic_monitor/manager/health.go
@@ -13,7 +13,7 @@ package manager
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR nCONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
@@ -184,7 +184,7 @@ func processHealthResult(
                healthHistoryCopy[healthResult.ID] = 
pruneHistory(append([]cache.Result{healthResult}, 
healthHistoryCopy[healthResult.ID]...), maxHistory)
        }
 
-       health.CalcAvailability(results, "health", nil, monitorConfigCopy, 
toDataCopy, localCacheStatusThreadsafe, localStates, events)
+       health.CalcAvailability(results, "health", monitorConfigCopy, 
toDataCopy, localCacheStatusThreadsafe, localStates, events)
 
        healthHistory.Set(healthHistoryCopy)
        // TODO determine if we should combineCrStates() here
diff --git a/traffic_monitor/manager/stat.go b/traffic_monitor/manager/stat.go
index ff930e708..2bde97d1f 100644
--- a/traffic_monitor/manager/stat.go
+++ b/traffic_monitor/manager/stat.go
@@ -159,7 +159,6 @@ func processStatResults(
 
        // setting the statHistory could be put in a goroutine concurrent with 
`ds.CreateStats`, if it were slow
        statInfoHistory := statInfoHistoryThreadsafe.Get().Copy()
-       statResultHistory := statResultHistoryThreadsafe.Get().Copy()
        statMaxKbpses := statMaxKbpsesThreadsafe.Get().Copy()
 
        for i, result := range results {
@@ -179,7 +178,7 @@ func processStatResults(
                        }
                }
                statInfoHistory.Add(result, maxStats)
-               if err := statResultHistory.Add(result, maxStats); err != nil {
+               if err := statResultHistoryThreadsafe.Add(result, maxStats); 
err != nil {
                        log.Errorf("Adding result from %v: %v\n", result.ID, 
err)
                }
                // Don't add errored maxes or precomputed DSStats
@@ -196,7 +195,6 @@ func processStatResults(
                lastResults[result.ID] = result
        }
        statInfoHistoryThreadsafe.Set(statInfoHistory)
-       statResultHistoryThreadsafe.Set(statResultHistory)
        statMaxKbpsesThreadsafe.Set(statMaxKbpses)
 
        newDsStats, newLastStats, err := ds.CreateStats(precomputedData, 
toData, combinedStates, lastStats.Get().Copy(), time.Now(), mc, events, 
localStates)
@@ -208,7 +206,7 @@ func processStatResults(
                lastStats.Set(newLastStats)
        }
 
-       health.CalcAvailability(results, "stat", statResultHistory, mc, toData, 
localCacheStatusThreadsafe, localStates, events)
+       health.CalcAvailabilityWithStats(results, "stat", 
statResultHistoryThreadsafe, mc, toData, localCacheStatusThreadsafe, 
localStates, events)
        combineState()
 
        endTime := time.Now()
diff --git a/traffic_monitor/threadsafe/resultstathistory.go 
b/traffic_monitor/threadsafe/resultstathistory.go
index 0dc5855db..05b679318 100644
--- a/traffic_monitor/threadsafe/resultstathistory.go
+++ b/traffic_monitor/threadsafe/resultstathistory.go
@@ -21,38 +21,19 @@ package threadsafe
  */
 
 import (
+       "encoding/json"
+       "errors"
+       "fmt"
+       "net/url"
        "sync"
+       "time"
 
+       "github.com/apache/trafficcontrol/lib/go-log"
+       "github.com/apache/trafficcontrol/lib/go-tc"
        "github.com/apache/trafficcontrol/traffic_monitor/cache"
+       "github.com/apache/trafficcontrol/traffic_monitor/srvhttp"
 )
 
-// ResultStatHistory provides safe access for multiple goroutines readers and 
a single writer to a stored HistoryHistory object.
-// This could be made lock-free, if the performance was necessary
-type ResultStatHistory struct {
-       history *cache.ResultStatHistory
-       m       *sync.RWMutex
-}
-
-// NewResultStatHistory returns a new ResultStatHistory safe for multiple 
readers and a single writer.
-func NewResultStatHistory() ResultStatHistory {
-       h := cache.ResultStatHistory{}
-       return ResultStatHistory{m: &sync.RWMutex{}, history: &h}
-}
-
-// Get returns the ResultStatHistory. Callers MUST NOT modify. If mutation is 
necessary, call ResultStatHistory.Copy()
-func (h *ResultStatHistory) Get() cache.ResultStatHistory {
-       h.m.RLock()
-       defer h.m.RUnlock()
-       return *h.history
-}
-
-// Set sets the internal ResultStatHistory. This is only safe for one thread 
of execution. This MUST NOT be called from multiple threads.
-func (h *ResultStatHistory) Set(v cache.ResultStatHistory) {
-       h.m.Lock()
-       *h.history = v
-       h.m.Unlock()
-}
-
 // ResultStatHistory provides safe access for multiple goroutines readers and 
a single writer to a stored HistoryHistory object.
 // This could be made lock-free, if the performance was necessary
 type ResultInfoHistory struct {
@@ -79,3 +60,199 @@ func (h *ResultInfoHistory) Set(v cache.ResultInfoHistory) {
        *h.history = v
        h.m.Unlock()
 }
+
+type ResultStatHistory struct{ *sync.Map } // 
map[tc.CacheName]ResultStatValHistory
+
+func NewResultStatHistory() ResultStatHistory {
+       return ResultStatHistory{&sync.Map{}}
+}
+
+func (h ResultStatHistory) LoadOrStore(cache tc.CacheName) 
ResultStatValHistory {
+       // TODO change to use sync.Pool?
+       v, _ := h.Map.LoadOrStore(cache, NewResultStatValHistory())
+       return v.(ResultStatValHistory)
+}
+
+// Range behaves like sync.Map.Range. It calls f for every value in the map; 
if f returns false, the iteration is stopped.
+func (h ResultStatHistory) Range(f func(cache tc.CacheName, val 
ResultStatValHistory) bool) {
+       h.Map.Range(func(k, v interface{}) bool {
+               return f(k.(tc.CacheName), v.(ResultStatValHistory))
+       })
+}
+
+// ResultStatValHistory is threadsafe for one writer. Specifically, because a 
CompareAndSwap is not provided, it's not possible to Load and Store without a 
race condition.
+// If multiple writers were necessary, it wouldn't be difficult to add a 
CompareAndSwap, internally storing an atomically-accessed pointer to the slice.
+type ResultStatValHistory struct{ *sync.Map } //  map[string][]ResultStatVal
+
+func NewResultStatValHistory() ResultStatValHistory { return 
ResultStatValHistory{&sync.Map{}} }
+
+// Load returns the []ResultStatVal for the given stat. If the given stat does 
not exist, nil is returned.
+func (h ResultStatValHistory) Load(stat string) []cache.ResultStatVal {
+       v, ok := h.Map.Load(stat)
+       if !ok {
+               return nil
+       }
+       return v.([]cache.ResultStatVal)
+}
+
+// Range behaves like sync.Map.Range. It calls f for every value in the map; 
if f returns false, the iteration is stopped.
+func (h ResultStatValHistory) Range(f func(stat string, val 
[]cache.ResultStatVal) bool) {
+       h.Map.Range(func(k, v interface{}) bool {
+               return f(k.(string), v.([]cache.ResultStatVal))
+       })
+}
+
+// Store stores the given []ResultStatVal in the ResultStatValHistory for the 
given stat. Store is threadsafe for only one writer.
+// Specifically, if there are multiple writers, there's a race, that one 
writer could Load(), another writer could Store() underneath it, and the first 
writer would then Store() having lost values.
+// To safely use ResultStatValHistory with multiple writers, a CompareAndSwap 
function would have to be added.
+func (h ResultStatValHistory) Store(stat string, vals []cache.ResultStatVal) {
+       h.Map.Store(stat, vals)
+}
+
+func (a ResultStatHistory) Add(r cache.Result, limit uint64) error {
+       errStrs := ""
+       resultHistory := a.LoadOrStore(r.ID)
+       if limit == 0 {
+               log.Warnln("ResultStatHistory.Add got limit 0 - setting to 1")
+               limit = 1
+       }
+
+       for statName, statVal := range r.Astats.Ats {
+               statHistory := resultHistory.Load(statName)
+               if len(statHistory) == 0 {
+                       statHistory = make([]cache.ResultStatVal, 0, limit) // 
initialize to the limit, to avoid multiple allocations. TODO put in 
.Load(statName, defaultSize)?
+               }
+
+               // TODO check len(statHistory) == 0 before indexing, potential 
panic?
+
+               ok, err := newStatEqual(statHistory, statVal)
+
+               // If the new stat value is the same as the last, update the 
time and increment the span. Span is the number of polls the latest value has 
been the same, and hence the length of time it's been the same is 
span*pollInterval.
+               if err != nil {
+                       errStrs += "cannot add stat " + statName + ": " + 
err.Error() + "; "
+               } else if ok {
+                       statHistory[0].Time = r.Time
+                       statHistory[0].Span++
+               } else {
+                       resultVal := cache.ResultStatVal{
+                               Val:  statVal,
+                               Time: r.Time,
+                               Span: 1,
+                       }
+
+                       if len(statHistory) > int(limit) {
+                               statHistory = statHistory[:int(limit)]
+                       } else if len(statHistory) < int(limit) {
+                               statHistory = append(statHistory, 
cache.ResultStatVal{})
+                       }
+                       // shift all values to the right, in order to put the 
new val at the beginning. Faster than allocating memory again
+                       for i := len(statHistory) - 1; i >= 1; i-- {
+                               statHistory[i] = statHistory[i-1]
+                       }
+                       statHistory[0] = resultVal // new result at the 
beginning
+               }
+               resultHistory.Store(statName, statHistory)
+       }
+
+       if errStrs != "" {
+               return errors.New("some stats could not be added: " + 
errStrs[:len(errStrs)-2])
+       }
+       return nil
+}
+
+// newStatEqual Returns whether the given stat is equal to the latest stat in 
history. If len(history)==0, this returns false without error. If the given 
stat is not a JSON primitive (string, number, bool), this returns an error. We 
explicitly refuse to compare arrays and objects, for performance.
+func newStatEqual(history []cache.ResultStatVal, stat interface{}) (bool, 
error) {
+       if len(history) == 0 {
+               return false, nil // if there's no history, it's "not equal", 
i.e. store this new history
+       }
+       switch stat.(type) {
+       case string:
+       case float64:
+       case bool:
+       default:
+               return false, fmt.Errorf("incomparable stat type %T", stat)
+       }
+       switch history[0].Val.(type) {
+       case string:
+       case float64:
+       case bool:
+       default:
+               return false, fmt.Errorf("incomparable history stat type %T", 
stat)
+       }
+       return stat == history[0].Val, nil
+}
+
+// StatsMarshall encodes the stats in JSON, encoding up to historyCount of 
each stat. If statsToUse is empty, all stats are encoded; otherwise, only the 
given stats are encoded. If wildcard is true, stats which contain the text in 
each statsToUse are returned, instead of exact stat names. If cacheType is not 
CacheTypeInvalid, only stats for the given type are returned. If hosts is not 
empty, only the given hosts are returned.
+func StatsMarshall(statResultHistory ResultStatHistory, statInfo 
cache.ResultInfoHistory, combinedStates tc.CRStates, monitorConfig 
tc.TrafficMonitorConfigMap, statMaxKbpses cache.Kbpses, filter cache.Filter, 
params url.Values) ([]byte, error) {
+       stats := cache.Stats{
+               CommonAPIData: srvhttp.GetCommonAPIData(params, time.Now()),
+               Caches:        
map[tc.CacheName]map[string][]cache.ResultStatVal{},
+       }
+
+       computedStats := cache.ComputedStats()
+
+       // TODO in 1.0, stats are divided into 'location', 'cache', and 'type'. 
'cache' are hidden by default.
+
+       for id, combinedStatesCache := range combinedStates.Caches {
+               if !filter.UseCache(id) {
+                       continue
+               }
+
+               cacheStatResultHistory := statResultHistory.LoadOrStore(id)
+               cacheStatResultHistory.Range(func(stat string, vals 
[]cache.ResultStatVal) bool {
+                       stat = "ats." + stat // TM1 prefixes ATS stats with 
'ats.'
+                       if !filter.UseStat(stat) {
+                               return true
+                       }
+                       historyCount := 1
+                       for _, val := range vals {
+                               if !filter.WithinStatHistoryMax(historyCount) {
+                                       break
+                               }
+                               if _, ok := stats.Caches[id]; !ok {
+                                       stats.Caches[id] = 
map[string][]cache.ResultStatVal{}
+                               }
+                               stats.Caches[id][stat] = 
append(stats.Caches[id][stat], val)
+                               historyCount += int(val.Span)
+                       }
+                       return true
+               })
+
+               serverInfo, ok := monitorConfig.TrafficServer[string(id)]
+               if !ok {
+                       log.Warnf("cache.StatsMarshall server %s missing from 
monitorConfig\n", id)
+               }
+
+               serverProfile, ok := monitorConfig.Profile[serverInfo.Profile]
+               if !ok {
+                       log.Warnf("cache.StatsMarshall server %s missing 
profile in monitorConfig\n", id)
+               }
+
+               for i, resultInfo := range statInfo[id] {
+                       if !filter.WithinStatHistoryMax(i + 1) {
+                               break
+                       }
+                       if _, ok := stats.Caches[id]; !ok {
+                               stats.Caches[id] = 
map[string][]cache.ResultStatVal{}
+                       }
+
+                       t := resultInfo.Time
+
+                       for stat, statValF := range computedStats {
+                               if !filter.UseStat(stat) {
+                                       continue
+                               }
+                               stats.Caches[id][stat] = 
append(stats.Caches[id][stat], cache.ResultStatVal{Val: statValF(resultInfo, 
serverInfo, serverProfile, combinedStatesCache), Time: t, Span: 1}) // 
combinedState will default to unavailable
+                       }
+               }
+       }
+
+       return json.Marshal(stats)
+}
+
+func pruneStats(history []cache.ResultStatVal, limit uint64) 
[]cache.ResultStatVal {
+       if uint64(len(history)) > limit {
+               history = history[:limit-1]
+       }
+       return history
+}
diff --git a/traffic_monitor/threadsafe/resultstathistory_test.go 
b/traffic_monitor/threadsafe/resultstathistory_test.go
new file mode 100644
index 000000000..ab6f21625
--- /dev/null
+++ b/traffic_monitor/threadsafe/resultstathistory_test.go
@@ -0,0 +1,168 @@
+package threadsafe
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import (
+       "encoding/json"
+       "fmt"
+       "math/rand"
+       "net/url"
+       "testing"
+       "time"
+
+       "github.com/apache/trafficcontrol/lib/go-tc"
+       "github.com/apache/trafficcontrol/traffic_monitor/cache"
+       "github.com/apache/trafficcontrol/traffic_monitor/srvhttp"
+)
+
+func randResultStatHistory() ResultStatHistory {
+       hist := NewResultStatHistory()
+
+       num := 5
+       for i := 0; i < num; i++ {
+               hist.Store(tc.CacheName(randStr()), randResultStatValHistory())
+       }
+       return hist
+}
+
+func randResultStatValHistory() ResultStatValHistory {
+       a := NewResultStatValHistory()
+       num := 5
+       numSlice := 5
+       for i := 0; i < num; i++ {
+               cacheName := randStr()
+               vals := []cache.ResultStatVal{}
+               for j := 0; j < numSlice; j++ {
+                       vals = append(vals, randResultStatVal())
+               }
+               a.Store(cacheName, vals)
+       }
+       return a
+}
+
+func randResultStatVal() cache.ResultStatVal {
+       return cache.ResultStatVal{
+               Val:  uint64(rand.Int63()),
+               Time: time.Now(),
+               Span: uint64(rand.Int63()),
+       }
+}
+
+func randResultInfoHistory() cache.ResultInfoHistory {
+       // type ResultInfoHistory map[tc.CacheName][]ResultInfo
+       hist := cache.ResultInfoHistory{}
+
+       num := 5
+       infNum := 5
+       for i := 0; i < num; i++ {
+               cacheName := tc.CacheName(randStr())
+               for j := 0; j < infNum; j++ {
+                       hist[cacheName] = append(hist[cacheName], 
randResultInfo())
+               }
+       }
+       return hist
+}
+
+func randResultInfo() cache.ResultInfo {
+       return cache.ResultInfo{
+               ID:          tc.CacheName(randStr()),
+               Error:       fmt.Errorf(randStr()),
+               Time:        time.Now(),
+               RequestTime: time.Millisecond * time.Duration(rand.Int()),
+               Vitals:      randVitals(),
+               PollID:      uint64(rand.Int63()),
+               Available:   randBool(),
+       }
+}
+
+func randVitals() cache.Vitals {
+       return cache.Vitals{
+               LoadAvg:    rand.Float64(),
+               BytesOut:   rand.Int63(),
+               BytesIn:    rand.Int63(),
+               KbpsOut:    rand.Int63(),
+               MaxKbpsOut: rand.Int63(),
+       }
+}
+
+func randStr() string {
+       chars := 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890-_"
+       num := 100
+       s := ""
+       for i := 0; i < num; i++ {
+               s += string(chars[rand.Intn(len(chars))])
+       }
+       return s
+}
+
+func randBool() bool {
+       return rand.Int()%2 == 0
+}
+
+type DummyFilterNever struct {
+}
+
+func (f DummyFilterNever) UseStat(name string) bool {
+       return false
+}
+
+func (f DummyFilterNever) UseCache(name tc.CacheName) bool {
+       return false
+}
+
+func (f DummyFilterNever) WithinStatHistoryMax(i int) bool {
+       return false
+}
+
+func TestStatsMarshall(t *testing.T) {
+       statHist := randResultStatHistory()
+       infHist := randResultInfoHistory()
+       filter := DummyFilterNever{}
+       params := url.Values{}
+       beforeStatsMarshall := time.Now()
+       bytes, err := StatsMarshall(statHist, infHist, tc.CRStates{}, 
tc.TrafficMonitorConfigMap{}, cache.Kbpses{}, filter, params)
+       afterStatsMarshall := time.Now()
+       if err != nil {
+               t.Fatalf("StatsMarshall return expected nil err, actual err: 
%v", err)
+       }
+       // if len(bytes) > 0 {
+       //      t.Errorf("expected empty bytes, actual: %v", string(bytes))
+       // }
+
+       stats := cache.Stats{}
+       if err := json.Unmarshal(bytes, &stats); err != nil {
+               t.Fatalf("unmarshalling expected nil err, actual err: %v", err)
+       }
+
+       if stats.CommonAPIData.QueryParams != "" {
+               t.Errorf(`unmarshalling stats.CommonAPIData.QueryParams 
expected "", actual %v`, stats.CommonAPIData.QueryParams)
+       }
+
+       statsDate, err := time.Parse(srvhttp.CommonAPIDataDateFormat, 
stats.CommonAPIData.DateStr)
+       if err != nil {
+               t.Errorf(`stats.CommonAPIData.DateStr expected format %v, 
actual %v`, srvhttp.CommonAPIDataDateFormat, stats.CommonAPIData.DateStr)
+       }
+       if beforeStatsMarshall.Truncate(time.Second).After(statsDate) || 
statsDate.Truncate(time.Second).After(afterStatsMarshall.Truncate(time.Second)) 
{ // round to second, because CommonAPIDataDateFormat is second-precision
+               t.Errorf(`unmarshalling stats.CommonAPIData.DateStr expected 
between %v and %v, actual %v`, beforeStatsMarshall, afterStatsMarshall, 
stats.CommonAPIData.DateStr)
+       }
+       if len(stats.Caches) > 0 {
+               t.Errorf(`unmarshalling stats.Caches expected empty, actual 
%+v`, stats.Caches)
+       }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to