mtorluemke closed pull request #1768: Add Traffic Monitor Plugin System for 
Cache Stats Formats
URL: https://github.com/apache/incubator-trafficcontrol/pull/1768
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/lib/go-tc/traffic_monitor.go b/lib/go-tc/traffic_monitor.go
index d8dbba8201..2597ed5c3c 100644
--- a/lib/go-tc/traffic_monitor.go
+++ b/lib/go-tc/traffic_monitor.go
@@ -96,6 +96,7 @@ type TMProfile struct {
 type TMParameters struct {
        HealthConnectionTimeout int    `json:"health.connection.timeout"`
        HealthPollingURL        string `json:"health.polling.url"`
+       HealthPollingFormat     string `json:"health.polling.format"`
        HistoryCount            int    `json:"history.count"`
        MinFreeKbps             int64
        Thresholds              map[string]HealthThreshold 
`json:"health_threshold"`
diff --git a/traffic_monitor/cache/cache.go b/traffic_monitor/cache/cache.go
index 6c04e97fdc..0c9f5c212d 100644
--- a/traffic_monitor/cache/cache.go
+++ b/traffic_monitor/cache/cache.go
@@ -24,9 +24,6 @@ import (
        "fmt"
        "io"
        "net/url"
-       "regexp"
-       "strconv"
-       "strings"
        "time"
 
        "github.com/apache/incubator-trafficcontrol/lib/go-log"
@@ -38,10 +35,8 @@ import (
 
 // Handler is a cache handler, which fulfills the common/handler `Handler` 
interface.
 type Handler struct {
-       resultChan         chan Result
-       Notify             int
-       ToData             *todata.TODataThreadsafe
-       MultipleSpaceRegex *regexp.Regexp
+       resultChan chan Result
+       ToData     *todata.TODataThreadsafe
 }
 
 func (h Handler) ResultChan() <-chan Result {
@@ -50,12 +45,12 @@ func (h Handler) ResultChan() <-chan Result {
 
 // NewHandler returns a new cache handler. Note this handler does NOT 
precomputes stat data before calling ResultChan, and Result.Precomputed will be 
nil
 func NewHandler() Handler {
-       return Handler{resultChan: make(chan Result), MultipleSpaceRegex: 
regexp.MustCompile(" +")}
+       return Handler{resultChan: make(chan Result)}
 }
 
 // NewPrecomputeHandler constructs a new cache Handler, which precomputes stat 
data and populates result.Precomputed before passing to ResultChan.
 func NewPrecomputeHandler(toData todata.TODataThreadsafe) Handler {
-       return Handler{resultChan: make(chan Result), MultipleSpaceRegex: 
regexp.MustCompile(" +"), ToData: &toData}
+       return Handler{resultChan: make(chan Result), ToData: &toData}
 }
 
 // Precompute returns whether this handler precomputes data before passing the 
result to the ResultChan
@@ -277,8 +272,8 @@ func StatsMarshall(statResultHistory ResultStatHistory, 
statInfo ResultInfoHisto
 }
 
 // Handle handles results fetched from a cache, parsing the raw Reader data 
and passing it along to a chan for further processing.
-func (handler Handler) Handle(id string, r io.Reader, reqTime time.Duration, 
reqEnd time.Time, reqErr error, pollID uint64, pollFinished chan<- uint64) {
-       log.Debugf("poll %v %v handle start\n", pollID, time.Now())
+func (handler Handler) Handle(id string, r io.Reader, format string, reqTime 
time.Duration, reqEnd time.Time, reqErr error, pollID uint64, pollFinished 
chan<- uint64) {
+       log.Debugf("poll %v %v (format '%v') handle start\n", pollID, 
time.Now(), format)
        result := Result{
                ID:           tc.CacheName(id),
                Time:         reqEnd,
@@ -301,260 +296,36 @@ func (handler Handler) Handle(id string, r io.Reader, 
reqTime time.Duration, req
                return
        }
 
-       result.PrecomputedData.Reporting = true
-       result.PrecomputedData.Time = result.Time
+       statDecoder, ok := StatsTypeDecoders[format]
+       if !ok {
+               log.Errorf("Handler cache '%s' stat type '%s' not found! 
Returning handle error for this cache poll.\n", id, format)
+               result.Error = fmt.Errorf("handler stat type %s missing")
+               handler.resultChan <- result
+               return
+       }
 
-       if decodeErr := json.NewDecoder(r).Decode(&result.Astats); decodeErr != 
nil {
-               log.Warnf("%s procnetdev decode error '%v'\n", id, decodeErr)
+       decodeErr := error(nil)
+       if decodeErr, result.Astats.Ats, result.Astats.System = 
statDecoder.Parse(result.ID, r); decodeErr != nil {
+               log.Warnf("%s decode error '%v'\n", id, decodeErr)
                result.Error = decodeErr
                handler.resultChan <- result
                return
        }
 
        if result.Astats.System.ProcNetDev == "" {
-               log.Warnf("addkbps %s procnetdev empty\n", id)
+               log.Warnf("Handler cache %s procnetdev empty\n", id)
        }
-
        if result.Astats.System.InfSpeed == 0 {
-               log.Warnf("addkbps %s inf.speed empty\n", id)
+               log.Warnf("Handler cache %s inf.speed empty\n", id)
        }
 
-       if reqErr != nil {
-               result.Error = reqErr
-               log.Errorf("addkbps handle %s error '%v'\n", id, reqErr)
-       } else {
-               result.Available = true
-       }
+       result.Available = true
 
        if handler.Precompute() {
-               result = handler.precompute(result)
+               result.PrecomputedData = statDecoder.Precompute(result.ID, 
handler.ToData.Get(), result.Astats.Ats, result.Astats.System)
        }
+       result.PrecomputedData.Reporting = true
+       result.PrecomputedData.Time = result.Time
 
        handler.resultChan <- result
 }
-
-// outBytes takes the proc.net.dev string, and the interface name, and returns 
the bytes field
-func outBytes(procNetDev, iface string, multipleSpaceRegex *regexp.Regexp) 
(int64, error) {
-       if procNetDev == "" {
-               return 0, fmt.Errorf("procNetDev empty")
-       }
-       if iface == "" {
-               return 0, fmt.Errorf("iface empty")
-       }
-       ifacePos := strings.Index(procNetDev, iface)
-       if ifacePos == -1 {
-               return 0, fmt.Errorf("interface '%s' not found in proc.net.dev 
'%s'", iface, procNetDev)
-       }
-
-       procNetDevIfaceBytes := procNetDev[ifacePos+len(iface)+1:]
-       procNetDevIfaceBytes = strings.TrimLeft(procNetDevIfaceBytes, " ")
-       procNetDevIfaceBytes = 
multipleSpaceRegex.ReplaceAllLiteralString(procNetDevIfaceBytes, " ")
-       procNetDevIfaceBytesArr := strings.Split(procNetDevIfaceBytes, " ") // 
this could be made faster with a custom function (DFA?) that splits and ignores 
duplicate spaces at the same time
-       if len(procNetDevIfaceBytesArr) < 10 {
-               return 0, fmt.Errorf("proc.net.dev iface '%v' unknown format 
'%s'", iface, procNetDev)
-       }
-       procNetDevIfaceBytes = procNetDevIfaceBytesArr[8]
-
-       return strconv.ParseInt(procNetDevIfaceBytes, 10, 64)
-}
-
-// precompute does the calculations which are possible with only this one 
cache result.
-// TODO precompute ResultStatVal
-func (handler Handler) precompute(result Result) Result {
-       todata := handler.ToData.Get()
-       stats := map[tc.DeliveryServiceName]dsdata.Stat{}
-
-       var err error
-       if result.PrecomputedData.OutBytes, err = 
outBytes(result.Astats.System.ProcNetDev, result.Astats.System.InfName, 
handler.MultipleSpaceRegex); err != nil {
-               result.PrecomputedData.OutBytes = 0
-               log.Errorf("addkbps %s handle precomputing outbytes '%v'\n", 
result.ID, err)
-       }
-
-       kbpsInMbps := int64(1000)
-       result.PrecomputedData.MaxKbps = int64(result.Astats.System.InfSpeed) * 
kbpsInMbps
-
-       for stat, value := range result.Astats.Ats {
-               var err error
-               stats, err = processStat(result.ID, stats, todata, stat, value, 
result.Time)
-               if err != nil && err != dsdata.ErrNotProcessedStat {
-                       log.Infof("precomputing cache %v stat %v value %v error 
%v", result.ID, stat, value, err)
-                       result.PrecomputedData.Errors = 
append(result.PrecomputedData.Errors, err)
-               }
-       }
-       result.PrecomputedData.DeliveryServiceStats = stats
-       return result
-}
-
-// processStat and its subsidiary functions act as a State Machine, flowing 
the stat thru states for each "." component of the stat name
-func processStat(server tc.CacheName, stats 
map[tc.DeliveryServiceName]dsdata.Stat, toData todata.TOData, stat string, 
value interface{}, timeReceived time.Time) 
(map[tc.DeliveryServiceName]dsdata.Stat, error) {
-       parts := strings.Split(stat, ".")
-       if len(parts) < 1 {
-               return stats, fmt.Errorf("stat has no initial part")
-       }
-
-       switch parts[0] {
-       case "plugin":
-               return processStatPlugin(server, stats, toData, stat, 
parts[1:], value, timeReceived)
-       case "proxy":
-               return stats, dsdata.ErrNotProcessedStat
-       case "server":
-               return stats, dsdata.ErrNotProcessedStat
-       default:
-               return stats, fmt.Errorf("stat '%s' has unknown initial part 
'%s'", stat, parts[0])
-       }
-}
-
-func processStatPlugin(server tc.CacheName, stats 
map[tc.DeliveryServiceName]dsdata.Stat, toData todata.TOData, stat string, 
statParts []string, value interface{}, timeReceived time.Time) 
(map[tc.DeliveryServiceName]dsdata.Stat, error) {
-       if len(statParts) < 1 {
-               return stats, fmt.Errorf("stat has no plugin part")
-       }
-       switch statParts[0] {
-       case "remap_stats":
-               return processStatPluginRemapStats(server, stats, toData, stat, 
statParts[1:], value, timeReceived)
-       default:
-               return stats, fmt.Errorf("stat has unknown plugin part '%s'", 
statParts[0])
-       }
-}
-
-func processStatPluginRemapStats(server tc.CacheName, stats 
map[tc.DeliveryServiceName]dsdata.Stat, toData todata.TOData, stat string, 
statParts []string, value interface{}, timeReceived time.Time) 
(map[tc.DeliveryServiceName]dsdata.Stat, error) {
-       if len(statParts) < 3 {
-               return stats, fmt.Errorf("stat has no remap_stats 
deliveryservice and name parts")
-       }
-
-       // the FQDN is `subsubdomain`.`subdomain`.`domain`. For a HTTP delivery 
service, `subsubdomain` will be the cache hostname; for a DNS delivery service, 
it will be `edge`. Then, `subdomain` is the delivery service regex.
-       subsubdomain := statParts[0]
-       subdomain := statParts[1]
-       domain := strings.Join(statParts[2:len(statParts)-1], ".")
-
-       ds, ok := toData.DeliveryServiceRegexes.DeliveryService(domain, 
subdomain, subsubdomain)
-       if !ok {
-               fqdn := fmt.Sprintf("%s.%s.%s", subsubdomain, subdomain, domain)
-               return stats, fmt.Errorf("ERROR no delivery service match for 
fqdn '%v' stat '%v'\n", fqdn, strings.Join(statParts, "."))
-       }
-       if ds == "" {
-               fqdn := fmt.Sprintf("%s.%s.%s", subsubdomain, subdomain, domain)
-               return stats, fmt.Errorf("ERROR EMPTY delivery service fqdn %v 
stat %v\n", fqdn, strings.Join(statParts, "."))
-       }
-
-       statName := statParts[len(statParts)-1]
-
-       dsStat, ok := stats[ds]
-       if !ok {
-               newStat := dsdata.NewStat()
-               dsStat = *newStat
-       }
-
-       if err := addCacheStat(&dsStat.TotalStats, statName, value); err != nil 
{
-               return stats, err
-       }
-
-       cachegroup, ok := toData.ServerCachegroups[server]
-       if !ok {
-               return stats, fmt.Errorf("server missing from 
TOData.ServerCachegroups")
-       }
-       dsStat.CacheGroups[cachegroup] = dsStat.TotalStats
-
-       cacheType, ok := toData.ServerTypes[server]
-       if !ok {
-               return stats, fmt.Errorf("server missing from 
TOData.ServerTypes")
-       }
-       dsStat.Types[cacheType] = dsStat.TotalStats
-
-       dsStat.Caches[server] = dsStat.TotalStats
-
-       dsStat.CachesTimeReceived[server] = timeReceived
-       stats[ds] = dsStat
-       return stats, nil
-}
-
-// addCacheStat adds the given stat to the existing stat. Note this adds, it 
doesn't overwrite. Numbers are summed, strings are concatenated.
-// TODO make this less duplicate code somehow.
-func addCacheStat(stat *dsdata.StatCacheStats, name string, val interface{}) 
error {
-       switch name {
-       case "status_2xx":
-               v, ok := val.(float64)
-               if !ok {
-                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
-               }
-               stat.Status2xx.Value += int64(v)
-       case "status_3xx":
-               v, ok := val.(float64)
-               if !ok {
-                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
-               }
-               stat.Status3xx.Value += int64(v)
-       case "status_4xx":
-               v, ok := val.(float64)
-               if !ok {
-                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
-               }
-               stat.Status4xx.Value += int64(v)
-       case "status_5xx":
-               v, ok := val.(float64)
-               if !ok {
-                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
-               }
-               stat.Status5xx.Value += int64(v)
-       case "out_bytes":
-               v, ok := val.(float64)
-               if !ok {
-                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
-               }
-               stat.OutBytes.Value += int64(v)
-       case "is_available":
-               v, ok := val.(bool)
-               if !ok {
-                       return fmt.Errorf("stat '%s' value expected bool actual 
'%v' type %T", name, val, val)
-               }
-               if v {
-                       stat.IsAvailable.Value = true
-               }
-       case "in_bytes":
-               v, ok := val.(float64)
-               if !ok {
-                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
-               }
-               stat.InBytes.Value += v
-       case "tps_2xx":
-               v, ok := val.(int64)
-               if !ok {
-                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
-               }
-               stat.Tps2xx.Value += float64(v)
-       case "tps_3xx":
-               v, ok := val.(int64)
-               if !ok {
-                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
-               }
-               stat.Tps3xx.Value += float64(v)
-       case "tps_4xx":
-               v, ok := val.(int64)
-               if !ok {
-                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
-               }
-               stat.Tps4xx.Value += float64(v)
-       case "tps_5xx":
-               v, ok := val.(int64)
-               if !ok {
-                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
-               }
-               stat.Tps5xx.Value += float64(v)
-       case "error_string":
-               v, ok := val.(string)
-               if !ok {
-                       return fmt.Errorf("stat '%s' value expected string 
actual '%v' type %T", name, val, val)
-               }
-               stat.ErrorString.Value += v + ", "
-       case "tps_total":
-               v, ok := val.(float64)
-               if !ok {
-                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
-               }
-               stat.TpsTotal.Value += v
-       case "status_unknown":
-               return dsdata.ErrNotProcessedStat
-       default:
-               return fmt.Errorf("unknown stat '%s'", name)
-       }
-       return nil
-}
diff --git a/traffic_monitor/cache/data_test.go 
b/traffic_monitor/cache/data_test.go
index 6d90774a6d..0ac20b500a 100644
--- a/traffic_monitor/cache/data_test.go
+++ b/traffic_monitor/cache/data_test.go
@@ -164,12 +164,11 @@ func randDsStat() dsdata.Stat {
        }
 
        return dsdata.Stat{
-               CommonStats:        randStatCommon(),
-               CacheGroups:        cacheGroups,
-               Types:              types,
-               Caches:             caches,
-               CachesTimeReceived: cachesTime,
-               TotalStats:         randStatCacheStats(),
+               CommonStats: randStatCommon(),
+               CacheGroups: cacheGroups,
+               Types:       types,
+               Caches:      caches,
+               TotalStats:  randStatCacheStats(),
        }
 }
 
diff --git a/traffic_monitor/cache/stats_type_astats.go 
b/traffic_monitor/cache/stats_type_astats.go
new file mode 100644
index 0000000000..98b7919d49
--- /dev/null
+++ b/traffic_monitor/cache/stats_type_astats.go
@@ -0,0 +1,253 @@
+package cache
+
+// stats_type_astats is the default Stats format for Traffic Control.
+// It is the Stats format produced by the `astats` plugin to Apache Traffic 
Server, included with Traffic Control.
+//
+// Stats are of the form `{"ats": {"name", number}}`,
+// Where `name` is of the form:
+//   `"plugin.remap_stats.fully-qualfiied-domain-name.example.net.stat-name"`
+// Where `stat-name` is one of:
+//   `in_bytes`, `out_bytes`, `status_2xx`, `status_3xx`, `status_4xx`, 
`status_5xx`
+
+import (
+       "encoding/json"
+       "fmt"
+       "io"
+       "strconv"
+       "strings"
+
+       "github.com/apache/incubator-trafficcontrol/lib/go-log"
+       "github.com/apache/incubator-trafficcontrol/lib/go-tc"
+       "github.com/apache/incubator-trafficcontrol/traffic_monitor/dsdata"
+       "github.com/apache/incubator-trafficcontrol/traffic_monitor/todata"
+)
+
+func init() {
+       AddStatsType("astats", astatsParse, astatsPrecompute)
+}
+
+func astatsParse(cache tc.CacheName, r io.Reader) (error, 
map[string]interface{}, AstatsSystem) {
+       astats := Astats{}
+       err := json.NewDecoder(r).Decode(&astats)
+       return err, astats.Ats, astats.System
+}
+
+func astatsPrecompute(cache tc.CacheName, toData todata.TOData, rawStats 
map[string]interface{}, system AstatsSystem) PrecomputedData {
+       stats := map[tc.DeliveryServiceName]dsdata.Stat{}
+       precomputed := PrecomputedData{}
+       var err error
+       if precomputed.OutBytes, err = astatsOutBytes(system.ProcNetDev, 
system.InfName); err != nil {
+               precomputed.OutBytes = 0
+               log.Errorf("precomputeAstats %s handle precomputing outbytes 
'%v'\n", cache, err)
+       }
+
+       kbpsInMbps := int64(1000)
+       precomputed.MaxKbps = int64(system.InfSpeed) * kbpsInMbps
+
+       for stat, value := range rawStats {
+               var err error
+               stats, err = astatsProcessStat(cache, stats, toData, stat, 
value)
+               if err != nil && err != dsdata.ErrNotProcessedStat {
+                       log.Infof("precomputing cache %v stat %v value %v error 
%v", cache, stat, value, err)
+                       precomputed.Errors = append(precomputed.Errors, err)
+               }
+       }
+       precomputed.DeliveryServiceStats = stats
+       return precomputed
+}
+
+// outBytes takes the proc.net.dev string, and the interface name, and returns 
the bytes field
+func astatsOutBytes(procNetDev, iface string) (int64, error) {
+       if procNetDev == "" {
+               return 0, fmt.Errorf("procNetDev empty")
+       }
+       if iface == "" {
+               return 0, fmt.Errorf("iface empty")
+       }
+       ifacePos := strings.Index(procNetDev, iface)
+       if ifacePos == -1 {
+               return 0, fmt.Errorf("interface '%s' not found in proc.net.dev 
'%s'", iface, procNetDev)
+       }
+
+       procNetDevIfaceBytes := procNetDev[ifacePos+len(iface)+1:]
+       procNetDevIfaceBytesArr := strings.Fields(procNetDevIfaceBytes) // TODO 
test
+       if len(procNetDevIfaceBytesArr) < 10 {
+               return 0, fmt.Errorf("proc.net.dev iface '%v' unknown format 
'%s'", iface, procNetDev)
+       }
+       procNetDevIfaceBytes = procNetDevIfaceBytesArr[8]
+
+       return strconv.ParseInt(procNetDevIfaceBytes, 10, 64)
+}
+
+// astatsProcessStat and its subsidiary functions act as a State Machine, 
flowing the stat thru states for each "." component of the stat name
+func astatsProcessStat(server tc.CacheName, stats 
map[tc.DeliveryServiceName]dsdata.Stat, toData todata.TOData, stat string, 
value interface{}) (map[tc.DeliveryServiceName]dsdata.Stat, error) {
+       parts := strings.Split(stat, ".")
+       if len(parts) < 1 {
+               return stats, fmt.Errorf("stat has no initial part")
+       }
+
+       switch parts[0] {
+       case "plugin":
+               return astatsProcessStatPlugin(server, stats, toData, stat, 
parts[1:], value)
+       case "proxy":
+               return stats, dsdata.ErrNotProcessedStat
+       case "server":
+               return stats, dsdata.ErrNotProcessedStat
+       default:
+               return stats, fmt.Errorf("stat '%s' has unknown initial part 
'%s'", stat, parts[0])
+       }
+}
+
+func astatsProcessStatPlugin(server tc.CacheName, stats 
map[tc.DeliveryServiceName]dsdata.Stat, toData todata.TOData, stat string, 
statParts []string, value interface{}) (map[tc.DeliveryServiceName]dsdata.Stat, 
error) {
+       if len(statParts) < 1 {
+               return stats, fmt.Errorf("stat has no plugin part")
+       }
+       switch statParts[0] {
+       case "remap_stats":
+               return astatsProcessStatPluginRemapStats(server, stats, toData, 
stat, statParts[1:], value)
+       default:
+               return stats, fmt.Errorf("stat has unknown plugin part '%s'", 
statParts[0])
+       }
+}
+
+func astatsProcessStatPluginRemapStats(server tc.CacheName, stats 
map[tc.DeliveryServiceName]dsdata.Stat, toData todata.TOData, stat string, 
statParts []string, value interface{}) (map[tc.DeliveryServiceName]dsdata.Stat, 
error) {
+       if len(statParts) < 3 {
+               return stats, fmt.Errorf("stat has no remap_stats 
deliveryservice and name parts")
+       }
+
+       // the FQDN is `subsubdomain`.`subdomain`.`domain`. For a HTTP delivery 
service, `subsubdomain` will be the cache hostname; for a DNS delivery service, 
it will be `edge`. Then, `subdomain` is the delivery service regex.
+       subsubdomain := statParts[0]
+       subdomain := statParts[1]
+       domain := strings.Join(statParts[2:len(statParts)-1], ".")
+
+       ds, ok := toData.DeliveryServiceRegexes.DeliveryService(domain, 
subdomain, subsubdomain)
+       if !ok {
+               fqdn := fmt.Sprintf("%s.%s.%s", subsubdomain, subdomain, domain)
+               return stats, fmt.Errorf("ERROR no delivery service match for 
fqdn '%v' stat '%v'\n", fqdn, strings.Join(statParts, "."))
+       }
+       if ds == "" {
+               fqdn := fmt.Sprintf("%s.%s.%s", subsubdomain, subdomain, domain)
+               return stats, fmt.Errorf("ERROR EMPTY delivery service fqdn %v 
stat %v\n", fqdn, strings.Join(statParts, "."))
+       }
+
+       statName := statParts[len(statParts)-1]
+
+       dsStat, ok := stats[ds]
+       if !ok {
+               newStat := dsdata.NewStat()
+               dsStat = *newStat
+       }
+
+       if err := astatsAddCacheStat(&dsStat.TotalStats, statName, value); err 
!= nil {
+               return stats, err
+       }
+
+       cachegroup, ok := toData.ServerCachegroups[server]
+       if !ok {
+               return stats, fmt.Errorf("server missing from 
TOData.ServerCachegroups")
+       }
+       dsStat.CacheGroups[cachegroup] = dsStat.TotalStats
+
+       cacheType, ok := toData.ServerTypes[server]
+       if !ok {
+               return stats, fmt.Errorf("server missing from 
TOData.ServerTypes")
+       }
+       dsStat.Types[cacheType] = dsStat.TotalStats
+
+       dsStat.Caches[server] = dsStat.TotalStats
+
+       stats[ds] = dsStat
+       return stats, nil
+}
+
+// addCacheStat adds the given stat to the existing stat. Note this adds, it 
doesn't overwrite. Numbers are summed, strings are concatenated.
+// TODO make this less duplicate code somehow.
+func astatsAddCacheStat(stat *dsdata.StatCacheStats, name string, val 
interface{}) error {
+       switch name {
+       case "status_2xx":
+               v, ok := val.(float64)
+               if !ok {
+                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
+               }
+               stat.Status2xx.Value += int64(v)
+       case "status_3xx":
+               v, ok := val.(float64)
+               if !ok {
+                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
+               }
+               stat.Status3xx.Value += int64(v)
+       case "status_4xx":
+               v, ok := val.(float64)
+               if !ok {
+                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
+               }
+               stat.Status4xx.Value += int64(v)
+       case "status_5xx":
+               v, ok := val.(float64)
+               if !ok {
+                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
+               }
+               stat.Status5xx.Value += int64(v)
+       case "out_bytes":
+               v, ok := val.(float64)
+               if !ok {
+                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
+               }
+               stat.OutBytes.Value += int64(v)
+       case "is_available":
+               v, ok := val.(bool)
+               if !ok {
+                       return fmt.Errorf("stat '%s' value expected bool actual 
'%v' type %T", name, val, val)
+               }
+               if v {
+                       stat.IsAvailable.Value = true
+               }
+       case "in_bytes":
+               v, ok := val.(float64)
+               if !ok {
+                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
+               }
+               stat.InBytes.Value += v
+       case "tps_2xx":
+               v, ok := val.(int64)
+               if !ok {
+                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
+               }
+               stat.Tps2xx.Value += float64(v)
+       case "tps_3xx":
+               v, ok := val.(int64)
+               if !ok {
+                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
+               }
+               stat.Tps3xx.Value += float64(v)
+       case "tps_4xx":
+               v, ok := val.(int64)
+               if !ok {
+                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
+               }
+               stat.Tps4xx.Value += float64(v)
+       case "tps_5xx":
+               v, ok := val.(int64)
+               if !ok {
+                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
+               }
+               stat.Tps5xx.Value += float64(v)
+       case "error_string":
+               v, ok := val.(string)
+               if !ok {
+                       return fmt.Errorf("stat '%s' value expected string 
actual '%v' type %T", name, val, val)
+               }
+               stat.ErrorString.Value += v + ", "
+       case "tps_total":
+               v, ok := val.(float64)
+               if !ok {
+                       return fmt.Errorf("stat '%s' value expected int actual 
'%v' type %T", name, val, val)
+               }
+               stat.TpsTotal.Value += v
+       case "status_unknown":
+               return dsdata.ErrNotProcessedStat
+       default:
+               return fmt.Errorf("unknown stat '%s'", name)
+       }
+       return nil
+}
diff --git a/traffic_monitor/cache/stats_types.go 
b/traffic_monitor/cache/stats_types.go
new file mode 100644
index 0000000000..ee18623cbd
--- /dev/null
+++ b/traffic_monitor/cache/stats_types.go
@@ -0,0 +1,48 @@
+package cache
+
+import (
+       "io"
+
+       "github.com/apache/incubator-trafficcontrol/lib/go-tc"
+       "github.com/apache/incubator-trafficcontrol/traffic_monitor/todata"
+)
+
+//
+// To create a new Stats Type, for a custom caching proxy with its own stats 
format:
+//
+// 1. Create a file for your type in this directory and package, 
`traffic_monitor/cache/`
+// 2. Create Parse and Precompute functions in your file, with the signature 
of `StatsTypeParser` and `StatsTypePrecomputer`
+// 3. In your file, add `func init(){AddStatsType(myTypeParser, 
myTypePrecomputer})`
+//
+// Your Parser should take the raw bytes from the `io.Reader` and populate the 
raw stats from them. For maximum compatibility, the names of these should be of 
the same form as Apache Traffic Server's `stats_over_http`, of the form 
"plugin.remap_stats.delivery-service-fqdn.com.in_bytes" et cetera. Traffic 
Control _may_ work with custom stat names, but we don't currently guarantee it.
+//
+// Your Precomputer should take the Stats and System information your Parser 
created, and populate the PrecomputedData. It is essential that all 
PrecomputedData fields are populated, especially `DeliveryServiceStats`, as 
they are used for cache and delivery service availability and threshold 
computation. If PrecomputedData is not properly and fully populated, the 
cache's availability will not be properly computed.
+//
+// Note this function is not called for Health polls, only Stat polls. Your 
Cache should have two separate stats endpoints: a small light endpoint 
returning only system stats and used to quickly verify reachability, and a 
large endpoint with all stats. If your cache does not have two stat endpoints, 
you may use your large stat endpoint for the Health poll, and configure the 
Health poll interval to be arbitrarily slow.
+//
+// Note the PrecomputedData `Reporting` and `Time` fields are the exception: 
they do not need to be set, and will be forcibly overridden by the Handler 
after your Precomputer function returns.
+//
+// Note your stats functions SHOULD NOT reuse functions from other stats 
types, even if they are similar, or have identical helper functions. This is a 
case where "duplicate" code is acceptable, because it's not conceptually 
duplicate. You don't want your stat parsers to break if the similar stats 
format you reuse code from changes.
+//
+
+const DefaultStatsType = "astats"
+
+// CacheStatsTypeDecoder is a pair of functions registered for decoding a 
particular Stats type, for parsing stats, and creating precomputed data
+type StatsTypeDecoder struct {
+       Parse      StatsTypeParser
+       Precompute StatsTypePrecomputer
+}
+
+// StatsTypeParser takes the bytes returned from the cache's stats endpoint, 
along with the cache name, and returns the map of raw stats (whose names must 
be strings, and values may be any primitive type but MUST be float64 if they 
are used by a Parameter Threshold) and System information.
+type StatsTypeParser func(cache tc.CacheName, r io.Reader) (error, 
map[string]interface{}, AstatsSystem)
+
+// StatsTypePrecomputer takes the cache name, the time the given stats were 
received, the Traffic Ops data, and the raw stats and system information 
created by Parse, and returns the PrecomputedData. Note this will only be 
called for Stats polls, not Health polls. Note errors should be returned in 
PrecomputedData.Errors
+//
+type StatsTypePrecomputer func(cache tc.CacheName, toData todata.TOData, stats 
map[string]interface{}, system AstatsSystem) PrecomputedData
+
+// StatsTypeDecoders holds the functions for parsing cache stats. This is not 
const, because Go doesn't allow constant maps. This is populated on startup, 
and MUST NOT be modified after startup.
+var StatsTypeDecoders = map[string]StatsTypeDecoder{}
+
+func AddStatsType(typeName string, parser StatsTypeParser, precomputer 
StatsTypePrecomputer) {
+       StatsTypeDecoders[typeName] = StatsTypeDecoder{Parse: parser, 
Precompute: precomputer}
+}
diff --git a/traffic_monitor/ds/stat.go b/traffic_monitor/ds/stat.go
index 8ef2244b0e..82008bdbf6 100644
--- a/traffic_monitor/ds/stat.go
+++ b/traffic_monitor/ds/stat.go
@@ -383,7 +383,6 @@ func CreateStats(precomputed 
map[tc.CacheName]cache.PrecomputedData, toData toda
                        httpDsStat.CacheGroups[cachegroup] = 
httpDsStat.CacheGroups[cachegroup].Sum(resultStat.CacheGroups[cachegroup])
                        httpDsStat.Types[serverType] = 
httpDsStat.Types[serverType].Sum(resultStat.Types[serverType])
                        httpDsStat.Caches[server] = 
httpDsStat.Caches[server].Sum(resultStat.Caches[server])
-                       httpDsStat.CachesTimeReceived[server] = 
resultStat.CachesTimeReceived[server]
                        httpDsStat.CommonStats = 
dsStats.DeliveryService[ds].CommonStats
                        dsStats.DeliveryService[ds] = httpDsStat // TODO 
determine if necessary
                }
diff --git a/traffic_monitor/dsdata/stat.go b/traffic_monitor/dsdata/stat.go
index 80e9010456..43d168757d 100644
--- a/traffic_monitor/dsdata/stat.go
+++ b/traffic_monitor/dsdata/stat.go
@@ -223,12 +223,11 @@ func (a StatCacheStats) Sum(b StatCacheStats) 
StatCacheStats {
 
 // Stat represents a complete delivery service stat, for a given poll, or at 
the time requested.
 type Stat struct {
-       CommonStats        StatCommon
-       CacheGroups        map[tc.CacheGroupName]StatCacheStats
-       Types              map[tc.CacheType]StatCacheStats
-       Caches             map[tc.CacheName]StatCacheStats
-       CachesTimeReceived map[tc.CacheName]time.Time
-       TotalStats         StatCacheStats
+       CommonStats StatCommon
+       CacheGroups map[tc.CacheGroupName]StatCacheStats
+       Types       map[tc.CacheType]StatCacheStats
+       Caches      map[tc.CacheName]StatCacheStats
+       TotalStats  StatCacheStats
 }
 
 // ErrNotProcessedStat indicates a stat received is not used by Traffic 
Monitor, nor returned by any API endpoint. Receiving this error indicates the 
stat has been discarded.
@@ -237,23 +236,21 @@ var ErrNotProcessedStat = errors.New("This stat is not 
used.")
 // NewStat returns a new delivery service Stat, initializing pointer members.
 func NewStat() *Stat {
        return &Stat{
-               CacheGroups:        map[tc.CacheGroupName]StatCacheStats{},
-               Types:              map[tc.CacheType]StatCacheStats{},
-               CommonStats:        StatCommon{CachesReporting: 
map[tc.CacheName]bool{}},
-               Caches:             map[tc.CacheName]StatCacheStats{},
-               CachesTimeReceived: map[tc.CacheName]time.Time{},
+               CacheGroups: map[tc.CacheGroupName]StatCacheStats{},
+               Types:       map[tc.CacheType]StatCacheStats{},
+               CommonStats: StatCommon{CachesReporting: 
map[tc.CacheName]bool{}},
+               Caches:      map[tc.CacheName]StatCacheStats{},
        }
 }
 
 // Copy performs a deep copy of this Stat. It does not modify, and is thus 
safe for multiple goroutines.
 func (a Stat) Copy() Stat {
        b := Stat{
-               CommonStats:        a.CommonStats.Copy(),
-               TotalStats:         a.TotalStats,
-               CacheGroups:        map[tc.CacheGroupName]StatCacheStats{},
-               Types:              map[tc.CacheType]StatCacheStats{},
-               Caches:             map[tc.CacheName]StatCacheStats{},
-               CachesTimeReceived: map[tc.CacheName]time.Time{},
+               CommonStats: a.CommonStats.Copy(),
+               TotalStats:  a.TotalStats,
+               CacheGroups: map[tc.CacheGroupName]StatCacheStats{},
+               Types:       map[tc.CacheType]StatCacheStats{},
+               Caches:      map[tc.CacheName]StatCacheStats{},
        }
        for k, v := range a.CacheGroups {
                b.CacheGroups[k] = v
@@ -264,9 +261,6 @@ func (a Stat) Copy() Stat {
        for k, v := range a.Caches {
                b.Caches[k] = v
        }
-       for k, v := range a.CachesTimeReceived {
-               b.CachesTimeReceived[k] = v
-       }
        return b
 }
 
diff --git a/traffic_monitor/fetcher/fetcher.go 
b/traffic_monitor/fetcher/fetcher.go
index 8f18eed5ba..0a9238555e 100644
--- a/traffic_monitor/fetcher/fetcher.go
+++ b/traffic_monitor/fetcher/fetcher.go
@@ -30,7 +30,7 @@ import (
 )
 
 type Fetcher interface {
-       Fetch(id string, url string, host string, pollId uint64, 
pollFinishedChan chan<- uint64)
+       Fetch(id string, url string, host string, format string, pollId uint64, 
pollFinishedChan chan<- uint64)
 }
 
 type HttpFetcher struct {
@@ -46,7 +46,7 @@ type Result struct {
        Error  error
 }
 
-func (f HttpFetcher) Fetch(id string, url string, host string, pollId uint64, 
pollFinishedChan chan<- uint64) {
+func (f HttpFetcher) Fetch(id string, url string, host string, format string, 
pollId uint64, pollFinishedChan chan<- uint64) {
        log.Debugf("poll %v %v fetch start\n", pollId, time.Now())
        req, err := http.NewRequest("GET", url, nil)
        // TODO: change this to use f.Headers. -jse
@@ -76,8 +76,8 @@ func (f HttpFetcher) Fetch(id string, url string, host 
string, pollId uint64, po
 
        if err == nil && response != nil {
                log.Debugf("poll %v %v fetch end\n", pollId, time.Now())
-               f.Handler.Handle(id, response.Body, reqTime, reqEnd, err, 
pollId, pollFinishedChan)
+               f.Handler.Handle(id, response.Body, format, reqTime, reqEnd, 
err, pollId, pollFinishedChan)
        } else {
-               f.Handler.Handle(id, nil, reqTime, reqEnd, err, pollId, 
pollFinishedChan)
+               f.Handler.Handle(id, nil, format, reqTime, reqEnd, err, pollId, 
pollFinishedChan)
        }
 }
diff --git a/traffic_monitor/handler/handler.go 
b/traffic_monitor/handler/handler.go
index 3b001c62ae..5777bb7d31 100644
--- a/traffic_monitor/handler/handler.go
+++ b/traffic_monitor/handler/handler.go
@@ -40,5 +40,5 @@ type OpsConfig struct {
 }
 
 type Handler interface {
-       Handle(string, io.Reader, time.Duration, time.Time, error, uint64, 
chan<- uint64)
+       Handle(string, io.Reader, string, time.Duration, time.Time, error, 
uint64, chan<- uint64)
 }
diff --git a/traffic_monitor/manager/monitorconfig.go 
b/traffic_monitor/manager/monitorconfig.go
index 1291fc0a15..c33ee451ef 100644
--- a/traffic_monitor/manager/monitorconfig.go
+++ b/traffic_monitor/manager/monitorconfig.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/incubator-trafficcontrol/lib/go-log"
        "github.com/apache/incubator-trafficcontrol/lib/go-tc"
        "github.com/apache/incubator-trafficcontrol/traffic_monitor/config"
+       "github.com/apache/incubator-trafficcontrol/traffic_monitor/cache"
        "github.com/apache/incubator-trafficcontrol/traffic_monitor/peer"
        "github.com/apache/incubator-trafficcontrol/traffic_monitor/poller"
        "github.com/apache/incubator-trafficcontrol/traffic_monitor/threadsafe"
@@ -253,6 +254,13 @@ func monitorConfigListen(
                                log.Errorf("monitor config server %v profile %v 
has no polling URL; can't poll", srv.HostName, srv.Profile)
                                continue
                        }
+
+                       format := 
monitorConfig.Profile[srv.Profile].Parameters.HealthPollingFormat
+                       if format == "" {
+                               format = cache.DefaultStatsType
+                               log.Infof("health.polling.format for '%v' is 
empty, using default '%v'", srv.HostName, format)
+                       }
+
                        r := strings.NewReplacer(
                                "${hostname}", srv.IP,
                                "${interface_name}", srv.InterfaceName,
@@ -267,10 +275,10 @@ func monitorConfigListen(
                                log.Warnln("profile " + srv.Profile + " 
health.connection.timeout Parameter is missing or zero, using default " + 
DefaultHealthConnectionTimeout.String())
                        }
 
-                       healthURLs[srv.HostName] = poller.PollConfig{URL: url, 
Host: srv.FQDN, Timeout: connTimeout}
+                       healthURLs[srv.HostName] = poller.PollConfig{URL: url, 
Host: srv.FQDN, Timeout: connTimeout, Format: format}
                        r = strings.NewReplacer("application=system", 
"application=")
                        statURL := r.Replace(url)
-                       statURLs[srv.HostName] = poller.PollConfig{URL: 
statURL, Host: srv.FQDN, Timeout: connTimeout}
+                       statURLs[srv.HostName] = poller.PollConfig{URL: 
statURL, Host: srv.FQDN, Timeout: connTimeout, Format: format}
                }
 
                peerSet := map[tc.TrafficMonitorName]struct{}{}
diff --git a/traffic_monitor/peer/peer.go b/traffic_monitor/peer/peer.go
index 384d787595..21d879b114 100644
--- a/traffic_monitor/peer/peer.go
+++ b/traffic_monitor/peer/peer.go
@@ -30,7 +30,6 @@ import (
 // Handler handles peer Traffic Monitor data, taking a raw reader, parsing the 
data, and passing a result object to the ResultChannel. This fulfills the 
common `Handler` interface.
 type Handler struct {
        ResultChannel chan Result
-       Notify        int
 }
 
 // NewHandler returns a new peer Handler.
@@ -50,7 +49,7 @@ type Result struct {
 }
 
 // Handle handles a response from a polled Traffic Monitor peer, parsing the 
data and forwarding it to the ResultChannel.
-func (handler Handler) Handle(id string, r io.Reader, reqTime time.Duration, 
reqEnd time.Time, err error, pollID uint64, pollFinished chan<- uint64) {
+func (handler Handler) Handle(id string, r io.Reader, format string, reqTime 
time.Duration, reqEnd time.Time, err error, pollID uint64, pollFinished chan<- 
uint64) {
        result := Result{
                ID:           tc.TrafficMonitorName(id),
                Available:    false,
diff --git a/traffic_monitor/poller/poller.go b/traffic_monitor/poller/poller.go
index 84f5f7b387..a8fff5e1f5 100644
--- a/traffic_monitor/poller/poller.go
+++ b/traffic_monitor/poller/poller.go
@@ -49,6 +49,7 @@ type PollConfig struct {
        Host    string
        Timeout time.Duration
        Handler handler.Handler
+       Format  string
 }
 
 type HttpPollerConfig struct {
@@ -177,11 +178,8 @@ var debugPollNum uint64
 type HTTPPollInfo struct {
        NoKeepAlive bool
        Interval    time.Duration
-       Timeout     time.Duration
        ID          string
-       URL         string
-       Host        string
-       Handler     handler.Handler
+       PollConfig
 }
 
 func (p HttpPoller) Poll() {
@@ -219,7 +217,7 @@ func (p HttpPoller) Poll() {
                                        }
                                }
                        }
-                       go poller(info.Interval, info.ID, info.URL, info.Host, 
fetcher, kill)
+                       go poller(info.Interval, info.ID, info.URL, info.Host, 
info.Format, fetcher, kill)
                }
                p.Config = newConfig
        }
@@ -235,7 +233,7 @@ func mustDie(die <-chan struct{}) bool {
 }
 
 // TODO iterationCount and/or p.TickChan?
-func poller(interval time.Duration, id string, url string, host string, 
fetcher fetcher.Fetcher, die <-chan struct{}) {
+func poller(interval time.Duration, id string, url string, host string, format 
string, fetcher fetcher.Fetcher, die <-chan struct{}) {
        pollSpread := 
time.Duration(rand.Float64()*float64(interval/time.Nanosecond)) * 
time.Nanosecond
        time.Sleep(pollSpread)
        tick := time.NewTicker(interval)
@@ -252,7 +250,7 @@ func poller(interval time.Duration, id string, url string, 
host string, fetcher
                        pollId := atomic.AddUint64(&debugPollNum, 1)
                        pollFinishedChan := make(chan uint64)
                        log.Debugf("poll %v %v start\n", pollId, time.Now())
-                       go fetcher.Fetch(id, url, host, pollId, 
pollFinishedChan) // TODO persist fetcher, with its own die chan?
+                       go fetcher.Fetch(id, url, host, format, pollId, 
pollFinishedChan) // TODO persist fetcher, with its own die chan?
                        <-pollFinishedChan
                case <-die:
                        tick.Stop()
@@ -275,9 +273,7 @@ func diffConfigs(old HttpPollerConfig, new 
HttpPollerConfig) ([]string, []HTTPPo
                                Interval:    new.Interval,
                                NoKeepAlive: new.NoKeepAlive,
                                ID:          id,
-                               URL:         pollCfg.URL,
-                               Host:        pollCfg.Host,
-                               Timeout:     pollCfg.Timeout,
+                               PollConfig: pollCfg,
                        })
                }
                return deletions, additions
@@ -293,9 +289,7 @@ func diffConfigs(old HttpPollerConfig, new 
HttpPollerConfig) ([]string, []HTTPPo
                                Interval:    new.Interval,
                                NoKeepAlive: new.NoKeepAlive,
                                ID:          id,
-                               URL:         newPollCfg.URL,
-                               Host:        newPollCfg.Host,
-                               Timeout:     newPollCfg.Timeout,
+                               PollConfig: newPollCfg,
                        })
                }
        }
@@ -307,9 +301,7 @@ func diffConfigs(old HttpPollerConfig, new 
HttpPollerConfig) ([]string, []HTTPPo
                                Interval:    new.Interval,
                                NoKeepAlive: new.NoKeepAlive,
                                ID:          id,
-                               URL:         newPollCfg.URL,
-                               Host:        newPollCfg.Host,
-                               Timeout:     newPollCfg.Timeout,
+                               PollConfig: newPollCfg,
                        })
                }
        }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to