rawlinp commented on a change in pull request #6017: URL: https://github.com/apache/trafficcontrol/pull/6017#discussion_r669031438
########## File path: traffic_ops/traffic_ops_golang/routing/middleware/readwhilewriter.go ########## @@ -0,0 +1,197 @@ +package middleware + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import ( + "net/http" + "strconv" + "sync" + + "github.com/apache/trafficcontrol/lib/go-log" + "github.com/apache/trafficcontrol/lib/go-tc" + "github.com/apache/trafficcontrol/lib/go-util" + "github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/auth" +) + +// ReadWhileWriterWrapper blocks multiple requests for the same object, +// and makes a single request to the real handler (which is probably expensive, e.g. database calls), +// and returns the result to all callers. +func ReadWhileWriterWrapper() Middleware { + rwr := NewRWR() + return func(h http.HandlerFunc) http.HandlerFunc { + return WrapReadWhileWriter(h, rwr) + } +} + +func WrapReadWhileWriter(h http.HandlerFunc, rwr *RWR) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + log.Infoln("ReadWhileWriter starting") + if r.Method != http.MethodGet { + // only GETs use RWR. TODO: determine if we should rwr HEAD + h(w, r) + return + } + user, err := auth.GetCurrentUser(r.Context()) + if err != nil { + h(w, r) // not our job to error, pass the request along + return + } + // Note no error doesn't mean a valid user, there may be no user (which is allowed for some endpoints). + // In that case, user.TenantID will be TenantIDInvalid. + // Which is fine, we'll use that as a cache key like any valid tenant, + // and all unauthenticated requests will share requests. + + // TODO: add "allowed to request no-cache" as a Tenant Role Permission + if user.TenantID != auth.TenantIDInvalid && requestNoCache(r) { + h(w, r) + return + } + + cacheKey := makeRWRCacheKey(r, user) + + multiRequestWrite(h, w, r, rwr, cacheKey) + } +} + +type ReqObj struct { + Body []byte + Code int + Headers http.Header + // TODO add headers +} + +func multiRequestWrite(h http.HandlerFunc, w http.ResponseWriter, r *http.Request, rwr *RWR, cacheKey CacheKey) { + log.Infoln("RWR starting") + if reqChan := rwr.GetOrMakeQueue(cacheKey); reqChan != nil { + log.Infoln("RWR: GetOrMakeQueue loaded, there's another concurrent reader, queueing up") + // we loaded, so a request is ongoing, we need to queue up + obj := <-reqChan + util.WriteHeaders(w, obj.Headers) + if obj.Code != 200 && obj.Code != 0 { + log.Infof("RWR: '"+string(cacheKey)+"' concurrent writing code %v\n", obj.Code) + w.WriteHeader(obj.Code) + } else { + log.Infof("RWR: '" + string(cacheKey) + "' concurrent had no code, not writing code\n") + } + w.Write(obj.Body) + return + } + + log.Infoln("RWR GetOrMakeQueue made queue, no concurrent reader") + + // we didn't load, so we're the first - make the req, respond to queued requestors, and close the queue. + + // To test the read-while-writer (which is normally very fast and difficult to test), + // you can uncomment the following lines: + // log.Infoln("DEBUG multiRequestWrite debug sleep") + // time.Sleep(time.Second * 10) // debug + + iw := &util.FullInterceptor{W: w} + h(iw, r) + + // If the StatusKey Context was set, prioritize it + ctx := r.Context() + val := ctx.Value(tc.StatusKey) + status, ok := val.(int) + if ok { + iw.Code = status + } + + util.WriteHeaders(w, iw.Headers) + if iw.Code != 200 && iw.Code != 0 { + log.Infof("RWR: '"+string(cacheKey)+"' writing code %v\n", iw.Code) + w.WriteHeader(iw.Code) + } else { + log.Infof("RWR: '" + string(cacheKey) + "' had no code, not writing code\n") + } + w.Write(iw.Body) + + // run in a goroutine, so we don't block this routine, and the http request can finish + // TODO test performance, vs closing the writer and calling WriteQueue in this goroutine + // TODO test performance, of dedicated QueueWriter goroutine(s) + go rwr.WriteQueue(cacheKey, ReqObj{Body: iw.Body, Code: iw.Code, Headers: iw.Headers}) +} + +func makeRWRCacheKey(r *http.Request, user *auth.CurrentUser) CacheKey { + // TODO combine with makeSmallCacheKey, when both are merged. Review comment: This function exists in this PR, so why the `TODO`? ########## File path: traffic_ops/traffic_ops_golang/routing/middleware/smallcache.go ########## @@ -0,0 +1,346 @@ +package middleware + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import ( + "net/http" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + "unsafe" + + "github.com/apache/trafficcontrol/lib/go-log" + "github.com/apache/trafficcontrol/lib/go-rfc" + "github.com/apache/trafficcontrol/lib/go-tc" + "github.com/apache/trafficcontrol/lib/go-util" + "github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/auth" +) + +func SmallCacheWrapper(span time.Duration, disableRWR bool) Middleware { + cache := NewSmallCache(span) + rwr := (*RWR)(nil) + if !disableRWR { + rwr = NewRWR() + } + return func(h http.HandlerFunc) http.HandlerFunc { + return WrapSmallCache(h, cache, rwr) + } +} + +// WrapSmallCache is middleware that adds a small cache to the server, for the same request by the same tenant. +// This is very important for control plane things, like caches requesting newly changed config all at once. +// This should generally be around 1 second. Configuring this over 5 seconds or so is strongly discouraged. +// +// Note the configured cache time will be how long new changes are potentially unavailable to requestors. +// So a 5s time means new changes will delay propogating to all clients, tenants and the control-plane, for 5 seconds. +// +// This doesn't follow HTTP Cache Control rules, or respect client requests for things like max-age. +// It does however return a proper Age header. +// +func WrapSmallCache(h http.HandlerFunc, cache *SmallCache, rwr *RWR) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + log.Infoln("SmallCache starting") + if cache.CacheTime == 0 && rwr == nil { + log.Infoln("SmallCache cache and rwr disabled, skipping entirely") + h(w, r) + return + } + if r.Method != http.MethodGet { + log.Infoln("SmallCache method not get, skipping entirely") + // only GETs are cached. TODO: determine if we should cache HEAD + h(w, r) + + // See RFC7234ยง4.4 + // Note this reduces the race, it does not eliminate it. + // Using the cache or read-while-writer is always a race. + // Clients which need to avoid that race must send a no-cache. + if !rfc.MethodIsSafe(r.Method) { + cache.InvalidatePath(r.URL.Path) + } + + return + } + + user, err := auth.GetCurrentUser(r.Context()) + if err != nil { + log.Infoln("SmallCache error getting user, skipping entirely") + h(w, r) // not our job to error, pass the request along + return + } + + // Note no error doesn't mean a valid user, there may be no user (which is allowed for some endpoints). + // In that case, user.TenantID will be TenantIDInvalid. + // Which is fine, we'll use that as a cache key like any valid tenant, + // and all unauthenticated requests will share a cache. + + // TODO: add "allowed to request no-cache" as a Tenant Role Permission + if user.TenantID != auth.TenantIDInvalid && requestNoCache(r) { + log.Infoln("SmallCache had valid tenant who sent no-cache, skipping entirely") + h(w, r) + return + } + + cacheKey := makeSmallCacheKey(r, user) + + if cache.CacheTime == 0 { + log.Infoln("SmallCache: '" + cacheKey + "' cache disabled, fetching") + smallCacheInterceptAndCache(h, w, r, cache, rwr, cacheKey) + return + } + + iCacheObj, ok := cache.Cache.Load(cacheKey) + if !ok { + log.Infoln("SmallCache: '" + cacheKey + "' not in cache, fetching") + smallCacheInterceptAndCache(h, w, r, cache, rwr, cacheKey) + return + } + + cacheObj, ok := iCacheObj.(CacheObj) + if !ok { + log.Infoln("SmallCache: '"+cacheKey+"' cache object typecast fail! Should never happen! type is %T\n", iCacheObj) + h(w, r) + return + } + + age := time.Since(cacheObj.Time) + if age > cache.CacheTime { + log.Infoln("SmallCache: in cache expired, fetching") + cache.Cache.Delete(cacheKey) + smallCacheInterceptAndCache(h, w, r, cache, rwr, cacheKey) + return + } + + log.Infoln("SmallCache: '" + string(cacheKey) + "' in cache fresh, returning cached") + smallCacheWriteObj(w, r, cacheObj) + } +} + +// requestNoCache returns whether the client requested not to be served from cache. +func requestNoCache(req *http.Request) bool { + if req.Header.Get("Cache-Control") == "" { + pragmaHdr := req.Header.Get("Pragma") + if pragmaHdr != "" && strings.Contains(strings.ToLower(pragmaHdr), "no-cache") { + return true + } + } + cc := rfc.ParseCacheControl(req.Header) + if _, ok := cc["no-cache"]; ok { + return true + } + if _, ok := cc["no-store"]; ok { + return true + } + if val, ok := cc["max-age"]; ok { + if val == `0` || val == `"0"` { + return true + } + } + return false +} + +func smallCacheInterceptAndCache(h http.HandlerFunc, w http.ResponseWriter, r *http.Request, cache *SmallCache, rwr *RWR, cacheKey CacheKey) { + if cache.CacheTime != 0 { + lastGC := (*time.Time)(atomic.LoadPointer(cache.LastGC)) + if cache.CacheTime != 0 && time.Since(*lastGC) > SmallCacheGCInterval { + go func() { cache.GC() }() + } + } + + if rwr != nil { + log.Infoln("RWR starting") + if reqChan := rwr.GetOrMakeQueue(cacheKey); reqChan != nil { + log.Infoln("RWR: GetOrMakeQueue loaded '" + cacheKey + "', there's another concurrent reader, queueing up") + // we loaded, so a request is ongoing, we need to queue up + obj := <-reqChan + if obj.Code != 200 { + log.Infof("smallCacheInterceptAndCache: '"+string(cacheKey)+"' writing code %v\n", obj.Code) + w.WriteHeader(obj.Code) + } else { + log.Infof("smallCacheInterceptAndCache: '" + string(cacheKey) + "' had no code, not writing code\n") + } + log.Infoln("RWR: '" + cacheKey + "' Writing concurrent body and returning") + w.Write(obj.Body) + // return without writing to the cache: only the first RWR caller needs to write to the cache + return + } + } else { + log.Infoln("RWR disabled") + } + + log.Infoln("SmallCache starting interceptor, calling next handler") + iw := &util.FullInterceptor{W: w} + h(iw, r) + log.Infoln("SmallCache intercepted, processing") + + // If the StatusKey Context was set, prioritize it + ctx := r.Context() + val := ctx.Value(tc.StatusKey) + status, ok := val.(int) + if ok { + iw.Code = status + } + + log.Infoln("SmallCache: '" + cacheKey + "' got original object") + + if cache.CacheTime != 0 { + cache.Cache.Store(cacheKey, CacheObj{ + Time: time.Now(), + Body: iw.Body, + Code: iw.Code, + Headers: iw.Headers, + }) + } + + util.WriteHeaders(w, iw.Headers) + if iw.Code != 0 { + log.Infof("SmallCache: '"+string(cacheKey)+"' writing code %v\n", iw.Code) + w.WriteHeader(iw.Code) + } else { + log.Infof("SmallCache: '" + string(cacheKey) + "' had no code, not writing code\n") + } + + log.Infoln("SmallCache: '" + string(cacheKey) + "' writing original object") + + log.Infoln("SmallCache about to write to real writer") + w.Write(iw.Body) + log.Infoln("SmallCache wrote to real writer") + + if rwr != nil { + log.Infoln("RWR: '" + cacheKey + "' starting thread to write to queued readers") + // run in a goroutine, so we don't block this routine, and the http request can finish + // TODO test performance, vs closing the writer (?) and calling WriteQueue in this goroutine + // TODO test performance, of dedicated QueueWriter goroutine(s) + go rwr.WriteQueue(cacheKey, ReqObj{Body: iw.Body, Headers: iw.Headers, Code: iw.Code}) + } +} + +func smallCacheWriteObj(w http.ResponseWriter, r *http.Request, obj CacheObj) { + w.Header().Set("Age", strconv.FormatInt(int64(time.Since(obj.Time)/time.Second)+1, 10)) + util.WriteHeaders(w, obj.Headers) + if obj.Code != 200 { + log.Infof("SmallCache: '"+""+"' WriteObj writing code %v\n", obj.Code) + w.WriteHeader(obj.Code) + } else { + log.Infof("SmallCache: '" + "" + "' WriteObj had no code, not writing code\n") + } + w.Write(obj.Body) +} + +func makeSmallCacheKey(r *http.Request, user *auth.CurrentUser) CacheKey { + // TODO confirm role is necessary, and capabilities aren't + // i.e. different users on the same tenant can't have different caps, right? Review comment: I'm pretty sure we'll be able to have users on the same tenant with different capabilities. ########## File path: docs/source/admin/traffic_ops.rst ########## @@ -415,6 +415,8 @@ This file deals with the configuration parameters of running Traffic Ops itself. :log_location_event: This optional field, if specified, should either be the location of a file to which event-level output will be logged, or one of the special strings ``"stdout"`` which indicates that STDOUT should be used, ``"stderr"`` which indicates that STDERR should be used or ``"null"`` which indicates that no output of this level should be generated. An empty string (``""``) and literally ``null`` are equivalent to ``"null"``. Default if not specified is ``"null"``. :log_location_info: This optional field, if specified, should either be the location of a file to which informational-level output will be logged, or one of the special strings ``"stdout"`` which indicates that STDOUT should be used, ``"stderr"`` which indicates that STDERR should be used or ``"null"`` which indicates that no output of this level should be generated. An empty string (``""``) and literally ``null`` are equivalent to ``"null"``. Default if not specified is ``"null"``. :log_location_warning: This optional field, if specified, should either be the location of a file to which warning-level output will be logged, or one of the special strings ``"stdout"`` which indicates that STDOUT should be used, ``"stderr"`` which indicates that STDERR should be used or ``"null"`` which indicates that no output of this level should be generated. An empty string (``""``) and literally ``null`` are equivalent to ``"null"``. Default if not specified is ``"null"``. + :cache_ms: Milliseconds to cache requests from the database in memory, for performance. This should generally be small, values above 5 seconds are discouraged. Set 0 to disable the cache. Default if not set is 1000ms. + :disable_read_while_writer: Disable read-while-writer functionality. Should generally only be set if issues are observed around read-while-write or request caching. Default if not set is false. Review comment: How do these settings work together? If the cache is disabled, does TO still do RWR? ########## File path: traffic_ops/traffic_ops_golang/routing/middleware/readwhilewriter.go ########## @@ -0,0 +1,197 @@ +package middleware + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import ( + "net/http" + "strconv" + "sync" + + "github.com/apache/trafficcontrol/lib/go-log" + "github.com/apache/trafficcontrol/lib/go-tc" + "github.com/apache/trafficcontrol/lib/go-util" + "github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/auth" +) + +// ReadWhileWriterWrapper blocks multiple requests for the same object, +// and makes a single request to the real handler (which is probably expensive, e.g. database calls), +// and returns the result to all callers. +func ReadWhileWriterWrapper() Middleware { + rwr := NewRWR() + return func(h http.HandlerFunc) http.HandlerFunc { + return WrapReadWhileWriter(h, rwr) + } +} + +func WrapReadWhileWriter(h http.HandlerFunc, rwr *RWR) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + log.Infoln("ReadWhileWriter starting") + if r.Method != http.MethodGet { + // only GETs use RWR. TODO: determine if we should rwr HEAD + h(w, r) + return + } + user, err := auth.GetCurrentUser(r.Context()) + if err != nil { + h(w, r) // not our job to error, pass the request along + return + } + // Note no error doesn't mean a valid user, there may be no user (which is allowed for some endpoints). + // In that case, user.TenantID will be TenantIDInvalid. + // Which is fine, we'll use that as a cache key like any valid tenant, + // and all unauthenticated requests will share requests. + + // TODO: add "allowed to request no-cache" as a Tenant Role Permission + if user.TenantID != auth.TenantIDInvalid && requestNoCache(r) { + h(w, r) + return + } + + cacheKey := makeRWRCacheKey(r, user) + + multiRequestWrite(h, w, r, rwr, cacheKey) + } +} + +type ReqObj struct { + Body []byte + Code int + Headers http.Header + // TODO add headers Review comment: Aren't they already added on L76? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
