Change TM2 manager params to be multiline Changes function definitions and calls to put each param on its own line. These managers have a large number of parameters. This makes it easier to read, and makes diffs better.
Fixes #2012 Project: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/commit/f0413d13 Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/tree/f0413d13 Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/diff/f0413d13 Branch: refs/heads/psql Commit: f0413d1313e405778213811dc46b29d3663475b1 Parents: 105be0f Author: Robert Butts <robert.o.bu...@gmail.com> Authored: Thu Oct 13 08:46:15 2016 -0600 Committer: Robert Butts <robert.o.bu...@gmail.com> Committed: Thu Oct 13 08:59:28 2016 -0600 ---------------------------------------------------------------------- .../traffic_monitor/manager/datarequest.go | 31 +++++- .../traffic_monitor/manager/healthresult.go | 102 +++++++++++++++++-- .../traffic_monitor/manager/manager.go | 14 ++- .../traffic_monitor/manager/monitorconfig.go | 31 +++++- .../traffic_monitor/manager/peer.go | 6 +- .../traffic_monitor/manager/stathistory.go | 20 +++- 6 files changed, 187 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/f0413d13/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go index 16bb43e..e84cef2 100644 --- a/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go +++ b/traffic_monitor/experimental/traffic_monitor/manager/datarequest.go @@ -400,8 +400,26 @@ func NewPeerStateFilter(params url.Values, cacheTypes map[enum.CacheName]enum.Ca } // DataRequest takes an `http_server.DataRequest`, and the monitored data objects, and returns the appropriate response, and the status code. -func DataRequest(req http_server.DataRequest, opsConfig OpsConfigThreadsafe, toSession towrap.ITrafficOpsSession, localStates peer.CRStatesThreadsafe, peerStates peer.CRStatesPeersThreadsafe, combinedStates peer.CRStatesThreadsafe, statHistory StatHistoryThreadsafe, dsStats DSStatsThreadsafe, events EventsThreadsafe, staticAppData StaticAppData, healthPollInterval time.Duration, lastHealthDurations DurationMapThreadsafe, fetchCount UintThreadsafe, healthIteration UintThreadsafe, errorCount UintThreadsafe, toData todata.TODataThreadsafe, localCacheStatus CacheAvailableStatusThreadsafe, lastStats LastStatsThreadsafe) (body []byte, responseCode int) { - +func DataRequest( + req http_server.DataRequest, + opsConfig OpsConfigThreadsafe, + toSession towrap.ITrafficOpsSession, + localStates peer.CRStatesThreadsafe, + peerStates peer.CRStatesPeersThreadsafe, + combinedStates peer.CRStatesThreadsafe, + statHistory StatHistoryThreadsafe, + dsStats DSStatsThreadsafe, + events EventsThreadsafe, + staticAppData StaticAppData, + healthPollInterval time.Duration, + lastHealthDurations DurationMapThreadsafe, + fetchCount UintThreadsafe, + healthIteration UintThreadsafe, + errorCount UintThreadsafe, + toData todata.TODataThreadsafe, + localCacheStatus CacheAvailableStatusThreadsafe, + lastStats LastStatsThreadsafe, +) (body []byte, responseCode int) { // handleErr takes an error, and the request type it came from, and logs. It is ok to call with a nil error, in which case this is a no-op. handleErr := func(err error, requestType http_server.Type) { if err == nil { @@ -524,7 +542,14 @@ func DataRequest(req http_server.DataRequest, opsConfig OpsConfigThreadsafe, toS } } -func createCacheStatuses(cacheTypes map[enum.CacheName]enum.CacheType, statHistory map[enum.CacheName][]cache.Result, lastHealthDurations map[enum.CacheName]time.Duration, cacheStates map[enum.CacheName]peer.IsAvailable, lastStats ds.LastStats, localCacheStatusThreadsafe CacheAvailableStatusThreadsafe) map[enum.CacheName]CacheStatus { +func createCacheStatuses( + cacheTypes map[enum.CacheName]enum.CacheType, + statHistory map[enum.CacheName][]cache.Result, + lastHealthDurations map[enum.CacheName]time.Duration, + cacheStates map[enum.CacheName]peer.IsAvailable, + lastStats ds.LastStats, + localCacheStatusThreadsafe CacheAvailableStatusThreadsafe, +) map[enum.CacheName]CacheStatus { conns := createCacheConnections(statHistory) statii := map[enum.CacheName]CacheStatus{} localCacheStatus := localCacheStatusThreadsafe.Get() http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/f0413d13/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go index 01a7132..f58565b 100644 --- a/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go +++ b/traffic_monitor/experimental/traffic_monitor/manager/healthresult.go @@ -51,18 +51,57 @@ func (o *DurationMapThreadsafe) Set(d DurationMap) { // This poll should be quicker and less computationally expensive for ATS, but // doesn't include all stat data needed for e.g. delivery service calculations.4 // Returns the last health durations, events, and the local cache statuses. -func StartHealthResultManager(cacheHealthChan <-chan cache.Result, toData todata.TODataThreadsafe, localStates peer.CRStatesThreadsafe, statHistory StatHistoryThreadsafe, monitorConfig TrafficMonitorConfigMapThreadsafe, peerStates peer.CRStatesPeersThreadsafe, combinedStates peer.CRStatesThreadsafe, fetchCount UintThreadsafe, errorCount UintThreadsafe, cfg config.Config) (DurationMapThreadsafe, EventsThreadsafe, CacheAvailableStatusThreadsafe) { +func StartHealthResultManager( + cacheHealthChan <-chan cache.Result, + toData todata.TODataThreadsafe, + localStates peer.CRStatesThreadsafe, + statHistory StatHistoryThreadsafe, + monitorConfig TrafficMonitorConfigMapThreadsafe, + peerStates peer.CRStatesPeersThreadsafe, + combinedStates peer.CRStatesThreadsafe, + fetchCount UintThreadsafe, + errorCount UintThreadsafe, + cfg config.Config, +) (DurationMapThreadsafe, EventsThreadsafe, CacheAvailableStatusThreadsafe) { lastHealthDurations := NewDurationMapThreadsafe() events := NewEventsThreadsafe(cfg.MaxEvents) localCacheStatus := NewCacheAvailableStatusThreadsafe() - go healthResultManagerListen(cacheHealthChan, toData, localStates, lastHealthDurations, statHistory, monitorConfig, peerStates, combinedStates, fetchCount, errorCount, events, localCacheStatus, cfg) + go healthResultManagerListen( + cacheHealthChan, + toData, + localStates, + lastHealthDurations, + statHistory, + monitorConfig, + peerStates, + combinedStates, + fetchCount, + errorCount, + events, + localCacheStatus, + cfg, + ) return lastHealthDurations, events, localCacheStatus } // cacheAggregateSeconds is how often to aggregate stats, if the health chan is never empty. (Otherwise, we read from the chan until it's empty, then aggregate, continuously) const cacheAggregateSeconds = 1 -func healthResultManagerListen(cacheHealthChan <-chan cache.Result, toData todata.TODataThreadsafe, localStates peer.CRStatesThreadsafe, lastHealthDurations DurationMapThreadsafe, statHistory StatHistoryThreadsafe, monitorConfig TrafficMonitorConfigMapThreadsafe, peerStates peer.CRStatesPeersThreadsafe, combinedStates peer.CRStatesThreadsafe, fetchCount UintThreadsafe, errorCount UintThreadsafe, events EventsThreadsafe, localCacheStatus CacheAvailableStatusThreadsafe, cfg config.Config) { +func healthResultManagerListen( + cacheHealthChan <-chan cache.Result, + toData todata.TODataThreadsafe, + localStates peer.CRStatesThreadsafe, + lastHealthDurations DurationMapThreadsafe, + statHistory StatHistoryThreadsafe, + monitorConfig TrafficMonitorConfigMapThreadsafe, + peerStates peer.CRStatesPeersThreadsafe, + combinedStates peer.CRStatesThreadsafe, + fetchCount UintThreadsafe, + errorCount UintThreadsafe, + events EventsThreadsafe, + localCacheStatus CacheAvailableStatusThreadsafe, + cfg config.Config, +) { lastHealthEndTimes := map[enum.CacheName]time.Time{} healthHistory := map[enum.CacheName][]cache.Result{} // This reads at least 1 value from the cacheHealthChan. Then, we loop, and try to read from the channel some more. If there's nothing to read, we hit `default` and process. If there is stuff to read, we read it, then inner-loop trying to read more. If we're continuously reading and the channel is never empty, and we hit the tick time, process anyway even though the channel isn't empty, to prevent never processing (starvation). @@ -75,14 +114,48 @@ func healthResultManagerListen(cacheHealthChan <-chan cache.Result, toData todat select { case <-tick: log.Warnf("Health Result Manager flushing queued results\n") - processHealthResult(cacheHealthChan, toData, localStates, lastHealthDurations, statHistory, monitorConfig, peerStates, combinedStates, fetchCount, errorCount, events, localCacheStatus, lastHealthEndTimes, healthHistory, results, cfg) + processHealthResult( + cacheHealthChan, + toData, + localStates, + lastHealthDurations, + statHistory, + monitorConfig, + peerStates, + combinedStates, + fetchCount, + errorCount, + events, + localCacheStatus, + lastHealthEndTimes, + healthHistory, + results, + cfg, + ) break innerLoop default: select { case r := <-cacheHealthChan: results = append(results, r) default: - processHealthResult(cacheHealthChan, toData, localStates, lastHealthDurations, statHistory, monitorConfig, peerStates, combinedStates, fetchCount, errorCount, events, localCacheStatus, lastHealthEndTimes, healthHistory, results, cfg) + processHealthResult( + cacheHealthChan, + toData, + localStates, + lastHealthDurations, + statHistory, + monitorConfig, + peerStates, + combinedStates, + fetchCount, + errorCount, + events, + localCacheStatus, + lastHealthEndTimes, + healthHistory, + results, + cfg, + ) break innerLoop } } @@ -91,7 +164,24 @@ func healthResultManagerListen(cacheHealthChan <-chan cache.Result, toData todat } // processHealthResult processes the given health results, adding their stats to the CacheAvailableStatus. Note this is NOT threadsafe, because it non-atomically gets CacheAvailableStatuses, Events, LastHealthDurations and later updates them. This MUST NOT be called from multiple threads. -func processHealthResult(cacheHealthChan <-chan cache.Result, toData todata.TODataThreadsafe, localStates peer.CRStatesThreadsafe, lastHealthDurationsThreadsafe DurationMapThreadsafe, statHistory StatHistoryThreadsafe, monitorConfig TrafficMonitorConfigMapThreadsafe, peerStates peer.CRStatesPeersThreadsafe, combinedStates peer.CRStatesThreadsafe, fetchCount UintThreadsafe, errorCount UintThreadsafe, events EventsThreadsafe, localCacheStatusThreadsafe CacheAvailableStatusThreadsafe, lastHealthEndTimes map[enum.CacheName]time.Time, healthHistory map[enum.CacheName][]cache.Result, results []cache.Result, cfg config.Config) { +func processHealthResult( + cacheHealthChan <-chan cache.Result, + toData todata.TODataThreadsafe, + localStates peer.CRStatesThreadsafe, + lastHealthDurationsThreadsafe DurationMapThreadsafe, + statHistory StatHistoryThreadsafe, + monitorConfig TrafficMonitorConfigMapThreadsafe, + peerStates peer.CRStatesPeersThreadsafe, + combinedStates peer.CRStatesThreadsafe, + fetchCount UintThreadsafe, + errorCount UintThreadsafe, + events EventsThreadsafe, + localCacheStatusThreadsafe CacheAvailableStatusThreadsafe, + lastHealthEndTimes map[enum.CacheName]time.Time, + healthHistory map[enum.CacheName][]cache.Result, + results []cache.Result, + cfg config.Config, +) { if len(results) == 0 { return } http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/f0413d13/traffic_monitor/experimental/traffic_monitor/manager/manager.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/manager/manager.go b/traffic_monitor/experimental/traffic_monitor/manager/manager.go index ed6a31d..cfdc2b1 100644 --- a/traffic_monitor/experimental/traffic_monitor/manager/manager.go +++ b/traffic_monitor/experimental/traffic_monitor/manager/manager.go @@ -75,8 +75,18 @@ func Start(opsConfigFile string, cfg config.Config, staticAppData StaticAppData) cfg, staticAppData) - combinedStates := StartPeerManager(peerHandler.ResultChannel, localStates, peerStates) - statHistory, _, lastKbpsStats, dsStats := StartStatHistoryManager(cacheStatHandler.ResultChannel, combinedStates, toData, errorCount, cfg) + combinedStates := StartPeerManager( + peerHandler.ResultChannel, + localStates, + peerStates) + + statHistory, _, lastKbpsStats, dsStats := StartStatHistoryManager( + cacheStatHandler.ResultChannel, + combinedStates, + toData, + errorCount, + cfg) + lastHealthDurations, events, localCacheStatus := StartHealthResultManager( cacheHealthHandler.ResultChannel, toData, http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/f0413d13/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go b/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go index 9c4ad50..cbb7aa3 100644 --- a/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go +++ b/traffic_monitor/experimental/traffic_monitor/manager/monitorconfig.go @@ -64,15 +64,40 @@ func (t *TrafficMonitorConfigMapThreadsafe) Set(c to.TrafficMonitorConfigMap) { t.m.Unlock() } -func StartMonitorConfigManager(monitorConfigPollChan <-chan to.TrafficMonitorConfigMap, localStates peer.CRStatesThreadsafe, statUrlSubscriber chan<- poller.HttpPollerConfig, healthUrlSubscriber chan<- poller.HttpPollerConfig, peerUrlSubscriber chan<- poller.HttpPollerConfig, cfg config.Config, staticAppData StaticAppData) TrafficMonitorConfigMapThreadsafe { +func StartMonitorConfigManager( + monitorConfigPollChan <-chan to.TrafficMonitorConfigMap, + localStates peer.CRStatesThreadsafe, + statUrlSubscriber chan<- poller.HttpPollerConfig, + healthUrlSubscriber chan<- poller.HttpPollerConfig, + peerUrlSubscriber chan<- poller.HttpPollerConfig, + cfg config.Config, + staticAppData StaticAppData, +) TrafficMonitorConfigMapThreadsafe { monitorConfig := NewTrafficMonitorConfigMapThreadsafe() - go monitorConfigListen(monitorConfig, monitorConfigPollChan, localStates, statUrlSubscriber, healthUrlSubscriber, peerUrlSubscriber, cfg, staticAppData) + go monitorConfigListen(monitorConfig, + monitorConfigPollChan, + localStates, + statUrlSubscriber, + healthUrlSubscriber, + peerUrlSubscriber, + cfg, + staticAppData, + ) return monitorConfig } // TODO timing, and determine if the case, or its internal `for`, should be put in a goroutine // TODO determine if subscribers take action on change, and change to mutexed objects if not. -func monitorConfigListen(monitorConfigTS TrafficMonitorConfigMapThreadsafe, monitorConfigPollChan <-chan to.TrafficMonitorConfigMap, localStates peer.CRStatesThreadsafe, statUrlSubscriber chan<- poller.HttpPollerConfig, healthUrlSubscriber chan<- poller.HttpPollerConfig, peerUrlSubscriber chan<- poller.HttpPollerConfig, cfg config.Config, staticAppData StaticAppData) { +func monitorConfigListen( + monitorConfigTS TrafficMonitorConfigMapThreadsafe, + monitorConfigPollChan <-chan to.TrafficMonitorConfigMap, + localStates peer.CRStatesThreadsafe, + statUrlSubscriber chan<- poller.HttpPollerConfig, + healthUrlSubscriber chan<- poller.HttpPollerConfig, + peerUrlSubscriber chan<- poller.HttpPollerConfig, + cfg config.Config, + staticAppData StaticAppData, +) { for { select { case monitorConfig := <-monitorConfigPollChan: http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/f0413d13/traffic_monitor/experimental/traffic_monitor/manager/peer.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/manager/peer.go b/traffic_monitor/experimental/traffic_monitor/manager/peer.go index e0b1b28..2f1b783 100644 --- a/traffic_monitor/experimental/traffic_monitor/manager/peer.go +++ b/traffic_monitor/experimental/traffic_monitor/manager/peer.go @@ -9,7 +9,11 @@ import ( ) // StartPeerManager listens for peer results, and when it gets one, it adds it to the peerStates list, and optimistically combines the good results into combinedStates -func StartPeerManager(peerChan <-chan peer.Result, localStates peer.CRStatesThreadsafe, peerStates peer.CRStatesPeersThreadsafe) peer.CRStatesThreadsafe { +func StartPeerManager( + peerChan <-chan peer.Result, + localStates peer.CRStatesThreadsafe, + peerStates peer.CRStatesPeersThreadsafe, +) peer.CRStatesThreadsafe { combinedStates := peer.NewCRStatesThreadsafe() go func() { for { http://git-wip-us.apache.org/repos/asf/incubator-trafficcontrol/blob/f0413d13/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go ---------------------------------------------------------------------- diff --git a/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go b/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go index cca4ca0..be4d1ef 100644 --- a/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go +++ b/traffic_monitor/experimental/traffic_monitor/manager/stathistory.go @@ -72,7 +72,13 @@ func pruneHistory(history []cache.Result, limit uint64) []cache.Result { // StartStatHistoryManager fetches the full statistics data from ATS Astats. This includes everything needed for all calculations, such as Delivery Services. This is expensive, though, and may be hard on ATS, so it should poll less often. // For a fast 'is it alive' poll, use the Health Result Manager poll. // Returns the stat history, the duration between the stat poll for each cache, the last Kbps data, and the calculated Delivery Service stats. -func StartStatHistoryManager(cacheStatChan <-chan cache.Result, combinedStates peer.CRStatesThreadsafe, toData todata.TODataThreadsafe, errorCount UintThreadsafe, cfg config.Config) (StatHistoryThreadsafe, DurationMapThreadsafe, LastStatsThreadsafe, DSStatsThreadsafe) { +func StartStatHistoryManager( + cacheStatChan <-chan cache.Result, + combinedStates peer.CRStatesThreadsafe, + toData todata.TODataThreadsafe, + errorCount UintThreadsafe, + cfg config.Config, +) (StatHistoryThreadsafe, DurationMapThreadsafe, LastStatsThreadsafe, DSStatsThreadsafe) { statHistory := NewStatHistoryThreadsafe(cfg.MaxStatHistory) lastStatDurations := NewDurationMapThreadsafe() lastStatEndTimes := map[enum.CacheName]time.Time{} @@ -107,7 +113,17 @@ func StartStatHistoryManager(cacheStatChan <-chan cache.Result, combinedStates p } // processStatResults processes the given results, creating and setting DSStats, LastStats, and other stats. Note this is NOT threadsafe, and MUST NOT be called from multiple threads. -func processStatResults(results []cache.Result, statHistoryThreadsafe StatHistoryThreadsafe, combinedStates peer.Crstates, lastStats LastStatsThreadsafe, toData todata.TOData, errorCount UintThreadsafe, dsStats DSStatsThreadsafe, lastStatEndTimes map[enum.CacheName]time.Time, lastStatDurationsThreadsafe DurationMapThreadsafe) { +func processStatResults( + results []cache.Result, + statHistoryThreadsafe StatHistoryThreadsafe, + combinedStates peer.Crstates, + lastStats LastStatsThreadsafe, + toData todata.TOData, + errorCount UintThreadsafe, + dsStats DSStatsThreadsafe, + lastStatEndTimes map[enum.CacheName]time.Time, + lastStatDurationsThreadsafe DurationMapThreadsafe, +) { statHistory := statHistoryThreadsafe.Get().Copy() maxStats := statHistoryThreadsafe.Max() for _, result := range results {