This is an automated email from the ASF dual-hosted git repository. kichan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/trafficserver-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push: new 9a69df8 Issue 298: Add Support for Caching CRD (#300) 9a69df8 is described below commit 9a69df84f011730bcece3517b8336dd94d3c242b Author: saurabh-saraswat <85618497+saurabh-saras...@users.noreply.github.com> AuthorDate: Thu Sep 11 19:25:03 2025 +0530 Issue 298: Add Support for Caching CRD (#300) * Issue 298: Add Support for Caching CRD * Apply caching policies and do clean up * /Fixing issues encountered in CI automation cases * /Fixing issues fmt issues * Added unit test cases * Resolved error encountered during execution of golangci-lint. And added documentation for Caching CRD --- Dockerfile | 2 + ats_caching/atscachingpolicy.yaml | 19 ++ ats_caching/crd-atscachingpolicy.yaml | 68 +++++ docs/CRD_CACHING.md | 148 ++++++++++ main/main.go | 18 +- proxy/ats.go | 11 + proxy/fakeATS.go | 5 + tests/data/setup/configmaps/ats-configmap.yaml | 7 +- tests/suite/test_ingress.py | 88 +++++- watcher/handlerCache.go | 204 ++++++++++++++ watcher/handlerCache_test.go | 195 +++++++++++++ watcher/watcher.go | 51 +++- watcher/watcher_test.go | 371 ++++++++++++++++++------- 13 files changed, 1061 insertions(+), 126 deletions(-) diff --git a/Dockerfile b/Dockerfile index bbc27cd..f1cf0be 100644 --- a/Dockerfile +++ b/Dockerfile @@ -167,6 +167,8 @@ RUN adduser -S -D -H -u 1000 -h /tmp -s /sbin/nologin -G ats -g ats ats COPY --from=builder --chown=ats:ats /opt/ats /opt/ats +ENV PATH="/opt/ats/bin:${PATH} + USER ats ENTRYPOINT ["/opt/ats/bin/entry.sh"] diff --git a/ats_caching/atscachingpolicy.yaml b/ats_caching/atscachingpolicy.yaml new file mode 100644 index 0000000..765734d --- /dev/null +++ b/ats_caching/atscachingpolicy.yaml @@ -0,0 +1,19 @@ +apiVersion: k8s.trafficserver.apache.com/v1 +kind: ATSCachingPolicy +metadata: + name: my-app-caching + namespace: caching-ats-new +spec: + rules: + - name: home-endpoint + primarySpecifier: + type: url_regex + pattern: ".*/app1" + action: cache + ttl: "12s" + - name: static-assets + primarySpecifier: + type: url_regex + pattern: "^/app2" + action: cache + ttl: "20s" diff --git a/ats_caching/crd-atscachingpolicy.yaml b/ats_caching/crd-atscachingpolicy.yaml new file mode 100644 index 0000000..5ef1f17 --- /dev/null +++ b/ats_caching/crd-atscachingpolicy.yaml @@ -0,0 +1,68 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: atscachingpolicies.k8s.trafficserver.apache.com +spec: + group: k8s.trafficserver.apache.com + scope: Cluster + names: + plural: atscachingpolicies + singular: atscachingpolicy + kind: ATSCachingPolicy + shortNames: + - atscp + versions: + - name: v1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + rules: + type: array + description: List of caching rules + items: + type: object + properties: + name: + type: string + description: Human-friendly rule name + primarySpecifier: + type: object + properties: + type: + type: string + description: 'One of url_regex, dest_domain, dest_host, dest_ip' + pattern: + type: string + description: Pattern to match (regex, domain, host, or IP) + secondarySpecifiers: + type: object + properties: + port: + type: integer + scheme: + type: string + method: + type: string + prefix: + type: string + suffix: + type: string + src_ip: + type: string + time: + type: string + internal: + type: boolean + action: + type: string + description: Cache action (e.g., cache, never-cache) + ttl: + type: string + description: Cache time to live (e.g., "10s", "1h") + diff --git a/docs/CRD_CACHING.md b/docs/CRD_CACHING.md new file mode 100644 index 0000000..936e515 --- /dev/null +++ b/docs/CRD_CACHING.md @@ -0,0 +1,148 @@ +# Caching CRD for ATS + + +## Before enabling the cache + +Let us check how we can verify whether caching is happening or not using curl command: +```bash +curl -v -H "Host: test.edge.com" http://{minikubeip}:30080/app1 +``` +We need to use the respective ip of the minikube we are using or the node ip on which the ats ingress controller is running. + +The response we receive has the following details along with the HTML response (output for the first curl command): +```bash +> GET /app1 HTTP/1.1 +> Host: test.edge.com +> User-Agent: curl/8.5.0 +> Accept: */* +> +< HTTP/1.1 200 OK +< X-Powered-By: Express +< Accept-Ranges: bytes +< Cache-Control: public, max-age=0 +< Last-Modified: Fri, 18 Jul 2025 07:01:16 GMT +< Content-Type: text/html; charset=UTF-8 +< Content-Length: 190 +< Date: Wed, 03 Sep 2025 09:48:09 GMT +< Etag: W/"be-1981c565260" +< Age: 0 +< Connection: keep-alive +< Server: ATS/9.2.11 +``` +Now, when we run the same command after 6 seconds, we will have a response which will have following details: +```bash +> GET /app1 HTTP/1.1 +> Host: test.edge.com +> User-Agent: curl/8.5.0 +> Accept: */* +> +< HTTP/1.1 200 OK +< X-Powered-By: Express +< Accept-Ranges: bytes +< Cache-Control: public, max-age=0 +< Last-Modified: Fri, 18 Jul 2025 07:01:16 GMT +< Content-Type: text/html; charset=UTF-8 +< Content-Length: 190 +< Date: Wed, 03 Sep 2025 09:48:15 GMT +< Etag: W/"be-1981c565260" +< Age: 0 +< Connection: keep-alive +< Server: ATS/9.2.11 +``` +When we observe the details for the response for both the curl executions, the value for `Age` is `0` and the `Date` field has different values (look for the seconds), indicating response was not cached. + +## Enabling the cache + +### Steps to take before applying caching CRD (needed only first time) : +To apply a file we use `kubectl apply -f <filename.yaml>`. +- Go to the folder `trafficserver-ingress-controller/ats_caching`. +- Apply the file `ats-cachingpolicy-role.yaml`. +- Apply the file `ats-cachingpolicy-binding.yaml`. + +The `ats-cachingpolicy-role.yaml` file defines a cluster-wide role named `ats-cachingpolicy-role`, which grants read-only permissions (`get`, `list`, `watch`) on the `atscachingpolicies` resource within the `k8s.trafficserver.apache.com` API group. + +The `ats-cachingpolicy-binding.yaml` file binds the `ats-cachingpolicy-role` cluster role to the `default` service account, which allows the pods running under the `default` service account to read and watch `ATSCachingPolicy` objects across the cluster. + +### Steps for applying CRD to enable caching: +- Before applying the CRD check the currently available crds using `kubectl get crd`. +- Go to the folder `trafficserver-ingress-controller/ats_caching` +- Apply the file `crd-atscachingpolicy.yaml`. +- Apply the file `atscachingpolicy.yaml`. +- Now again check the available crds, `using kubectl get crd`. + +We will notice a new crd: +```bash +NAME CREATED AT +atscachingpolicies.k8s.trafficserver.apache.com 2025-09-03T09:45:13Z + +``` +Which was not available earlier. + + +### The content of atscachingpolicy.yaml is: +```yaml +apiVersion: k8s.trafficserver.apache.com/v1 +kind: ATSCachingPolicy +metadata: + name: my-app-caching + namespace: caching-ats-new +spec: + rules: + - name: home-endpoint + primarySpecifier: + type: url_regex + pattern: ".*/app1" + action: cache + ttl: "12s" +``` +Here, we have enabled cache for the pattern `.*app1` for `12` seconds. After `12` seconds of running the curl command the response won’t be available in the cache. + +## After enabling the cache +Execute the curl command +```bash +curl -v -H "Host: test.edge.com" http://{minikubeip}:30080/app1 +``` +The response we receive has the following details along with the HTML response( if same command was not run few minutes earlier): +```bash +> GET /app1 HTTP/1.1 +> Host: test.edge.com +> User-Agent: curl/8.5.0 +> Accept: */* +> +< HTTP/1.1 200 OK +< X-Powered-By: Express +< Accept-Ranges: bytes +< Cache-Control: public, max-age=0 +< Last-Modified: Fri, 18 Jul 2025 07:01:16 GMT +< Content-Type: text/html; charset=UTF-8 +< Content-Length: 190 +< Date: Wed, 03 Sep 2025 09:51:02 GMT +< Etag: W/"be-1981c565260" +< Age: 0 +< Connection: keep-alive +< Server: ATS/9.2.11 +``` +Now, when we run the same command after 7 seconds we will have a response along with following details: +```bash +> GET /app1 HTTP/1.1 +> Host: test.edge.com +> User-Agent: curl/8.5.0 +> Accept: */* +> +< HTTP/1.1 200 OK +< X-Powered-By: Express +< Accept-Ranges: bytes +< Cache-Control: public, max-age=0 +< Last-Modified: Fri, 18 Jul 2025 07:01:16 GMT +< Content-Type: text/html; charset=UTF-8 +< Content-Length: 190 +< Date: Wed, 03 Sep 2025 09:51:02 GMT +< Etag: W/"be-1981c565260" +< Age: 7 +< Connection: keep-alive +< Server: ATS/9.2.11 +``` +Observe both curl execution details, in both cases the value for `Age` is different and the `Date` field has the same values (look for the seconds). + + + diff --git a/main/main.go b/main/main.go index f79d9a2..da8949e 100644 --- a/main/main.go +++ b/main/main.go @@ -24,9 +24,9 @@ import ( "syscall" "time" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" _ "k8s.io/client-go/util/workqueue" @@ -126,6 +126,11 @@ func main() { log.Panicln(err.Error()) } + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + log.Panicln(err.Error()) + } + stopChan := make(chan struct{}) // ------------ Resolving Namespaces -------------------------------------- @@ -150,11 +155,12 @@ func main() { } watcher := w.Watcher{ - Cs: clientset, - ATSNamespace: *atsNamespace, - ResyncPeriod: *resyncPeriod, - Ep: &endpoint, - StopChan: stopChan, + Cs: clientset, + DynamicClient: dynamicClient, + ATSNamespace: *atsNamespace, + ResyncPeriod: *resyncPeriod, + Ep: &endpoint, + StopChan: stopChan, } err = watcher.Watch() diff --git a/proxy/ats.go b/proxy/ats.go index 3e9eca0..b23c702 100644 --- a/proxy/ats.go +++ b/proxy/ats.go @@ -27,6 +27,7 @@ import ( type ATSManagerInterface interface { ConfigSet(k, v string) (string, error) ConfigGet(k string) (string, error) + CacheSet() (string, error) IncludeIngressClass(c string) bool } @@ -58,6 +59,16 @@ func (m *ATSManager) ConfigSet(k, v string) (msg string, err error) { return fmt.Sprintf("Ran p.Key: %s p.Val: %s --> stdoutStderr: %q", k, v, stdoutStderr), nil } +func (m *ATSManager) CacheSet() (msg string, err error) { + cmd := exec.Command("traffic_ctl", "config", "reload") + stdoutStderr, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("failed to execute: traffic_ctl config reload Error: %s", err.Error()) + } + return fmt.Sprintf("Reload succesful --> stdoutStderr: %q", stdoutStderr), nil + +} + func (m *ATSManager) ConfigGet(k string) (msg string, err error) { cmd := exec.Command("traffic_ctl", "config", "get", k) stdoutStderr, err := cmd.CombinedOutput() diff --git a/proxy/fakeATS.go b/proxy/fakeATS.go index f9dbc8c..14144db 100644 --- a/proxy/fakeATS.go +++ b/proxy/fakeATS.go @@ -38,6 +38,11 @@ func (m *FakeATSManager) IncludeIngressClass(c string) bool { return false } +func (m *FakeATSManager) CacheSet() (msg string, err error) { + return "Config reload succesful", nil + +} + func (m *FakeATSManager) ConfigSet(k, v string) (msg string, err error) { m.Config[k] = v return fmt.Sprintf("Ran p.Key: %s p.Val: %s", k, v), nil diff --git a/tests/data/setup/configmaps/ats-configmap.yaml b/tests/data/setup/configmaps/ats-configmap.yaml index 7d66ddd..7219922 100644 --- a/tests/data/setup/configmaps/ats-configmap.yaml +++ b/tests/data/setup/configmaps/ats-configmap.yaml @@ -17,7 +17,7 @@ apiVersion: v1 kind: Namespace metadata: - name: trafficserver-test + name: trafficserver-test --- @@ -26,8 +26,13 @@ kind: ConfigMap metadata: namespace: trafficserver-test name: ats + annotations: + ats-configmap: "true" data: # reloadable data only proxy.config.output.logfile.rolling_enabled: "1" proxy.config.output.logfile.rolling_interval_sec: "3000" proxy.config.restart.active_client_threshold: "0" + + proxy.config.http.cache.http: "1" + proxy.config.http.cache.required_headers: "0" diff --git a/tests/suite/test_ingress.py b/tests/suite/test_ingress.py index 7d234a7..81e7f4f 100644 --- a/tests/suite/test_ingress.py +++ b/tests/suite/test_ingress.py @@ -19,6 +19,7 @@ import pytest import os import time import textwrap +import subprocess def kubectl_apply(yaml_path): os.system('kubectl apply -f ' + yaml_path) @@ -43,6 +44,9 @@ def setup_module(module): kubectl_apply('data/setup/apps/') kubectl_apply('data/setup/ingresses/') time.sleep(90) + kubectl_apply('../ats_caching/crd-atscachingpolicy.yaml') + kubectl_apply('../ats_caching/atscachingpolicy.yaml') + time.sleep(5) misc_command('kubectl get all -A') misc_command('kubectl get pod -A -o wide') misc_command('kubectl logs $(kubectl get pod -n trafficserver-test-2 -o name | head -1) -n trafficserver-test-2') @@ -116,6 +120,7 @@ def get_expected_response_app2(): return ' '.join(resp.split()) + class TestIngress: def test_basic_routing_edge_app1(self, minikubeip): req_url = "http://" + minikubeip + ":30080/app1" @@ -162,6 +167,86 @@ class TestIngress: f"Expected: 200 response code for test_basic_routing" assert ' '.join(resp.text.split()) == get_expected_response_app2() + def test_cache_app1(self, minikubeip): + kubectl_apply('../ats_caching/crd-atscachingpolicy.yaml') + kubectl_apply('../ats_caching/atscachingpolicy.yaml') + time.sleep(15) + + command = f'curl -i -v -H "Host: test.media.com" http://{minikubeip}:30080/app1' + response_1 = subprocess.run(command, shell=True, capture_output=True, text=True) + response1 = response_1.stdout.strip() + response1_list = response1.split('\n') + for res in response1_list: + if res.__contains__("Age"): + age1 = res + if res.__contains__("Date"): + mod_time1 = res + time.sleep(5) + response_2 = subprocess.run(command, shell=True, capture_output=True, text=True) + response2 = response_2.stdout.strip() + response2_list = response2.split('\n') + kubectl_delete('crd atscachingpolicies.k8s.trafficserver.apache.com') + for resp in response2_list: + if resp.__contains__("Age"): + age2 = resp + if resp.__contains__("Date"): + mod_time2 = resp + assert mod_time1 == mod_time2 and age1 != age2, "Expected Date provided by both responses to be same and the Age mentioned in second response to be more than 0" + + def test_cache_app1_beyond_ttl(self, minikubeip): + kubectl_apply('../ats_caching/crd-atscachingpolicy.yaml') + kubectl_apply('../ats_caching/atscachingpolicy.yaml') + time.sleep(15) + + command = f'curl -i -v -H "Host: test.media.com" http://{minikubeip}:30080/app1' + response_1 = subprocess.run(command, shell=True, capture_output=True, text=True) + response1 = response_1.stdout.strip() + response1_list = response1.split('\n') + for res in response1_list: + if res.__contains__("Age"): + age1 = res + if res.__contains__("Date"): + mod_time1 = res + time.sleep(16) + response_2 = subprocess.run(command, shell=True, capture_output=True, text=True) + response2 = response_2.stdout.strip() + response2_list = response2.split('\n') + for resp in response2_list: + if resp.__contains__("Age"): + age2 = resp + if resp.__contains__("Date"): + mod_time2 = resp + kubectl_delete('crd atscachingpolicies.k8s.trafficserver.apache.com') + expected_age = "Age: 0" + assert mod_time1 != mod_time2 and age1 == age2 and age2 == expected_age, "Expected Date provided by both responses to be different and the Age mentioned in both responses to be 0" + + def test_cache_app2(self, minikubeip): + kubectl_apply('../ats_caching/crd-atscachingpolicy.yaml') + kubectl_apply('../ats_caching/atscachingpolicy.yaml') + time.sleep(15) + + command = f'curl -i -v -H "Host: test.edge.com" http://{minikubeip}:30080/app2' + response_1 = subprocess.run(command, shell=True, capture_output=True, text=True) + response1 = response_1.stdout.strip() + response1_list = response1.split('\n') + for res in response1_list: + if res.__contains__("Age"): + age1 = res + if res.__contains__("Date"): + mod_time1 = res + time.sleep(9) + response_2 = subprocess.run(command, shell=True, capture_output=True, text=True) + response2 = response_2.stdout.strip() + response2_list = response2.split('\n') + for resp in response2_list: + if resp.__contains__("Age"): + age2 = resp + if resp.__contains__("Date"): + mod_time2 = resp + kubectl_delete('crd atscachingpolicies.k8s.trafficserver.apache.com') + assert mod_time1 != mod_time2 and age1 == age2, "Expected Date provided by both the responses to be different and the Age to be 0 in both the responses" + + def test_updating_ingress_media_app2(self, minikubeip): kubectl_apply('data/ats-ingress-update.yaml') req_url = "http://" + minikubeip + ":30080/app2" @@ -200,5 +285,4 @@ class TestIngress: assert resp.status_code == 301,\ f"Expected: 301 response code for test_snippet_edge_app2" assert resp.headers['Location'] == 'https://test.edge.com/app2' - - + diff --git a/watcher/handlerCache.go b/watcher/handlerCache.go new file mode 100644 index 0000000..30acf58 --- /dev/null +++ b/watcher/handlerCache.go @@ -0,0 +1,204 @@ +package watcher + +import ( + "fmt" + "log" + "os" + "strings" + + "github.com/apache/trafficserver-ingress-controller/endpoint" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// AtsCacheHandler handles ATSCachingPolicy events +type AtsCacheHandler struct { + ResourceName string + Ep *endpoint.Endpoint + CachePath string +} + +// Constructor +func NewAtsCacheHandler(resource string, ep *endpoint.Endpoint, path string) *AtsCacheHandler { + log.Println("ATS Cache Constructor initialized ") + return &AtsCacheHandler{ResourceName: resource, Ep: ep, CachePath: path} +} + +// Update ATS config +func (h *AtsCacheHandler) UpdateAts() { + log.Println("Update ATS called") + msg, err := h.Ep.ATSManager.CacheSet() + if err != nil { + log.Println("UpdateAts error:", err) + } else { + log.Println("ATS updated:", msg) + } +} + +// Add handles creation of ATSCachingPolicy resources +func (h *AtsCacheHandler) Add(obj interface{}) { + u := obj.(*unstructured.Unstructured) + log.Printf("[ADD] ATSCachingPolicy: %s/%s", u.GetNamespace(), u.GetName()) + + rules, found, err := unstructured.NestedSlice(u.Object, "spec", "rules") + if err != nil || !found { + log.Printf("Add: rules not found or error occurred: %v", err) + return + } + + var lines []string + for _, rule := range rules { + ruleMap, ok := rule.(map[string]interface{}) + if !ok { + continue + } + + primary, found, _ := unstructured.NestedMap(ruleMap, "primarySpecifier") + if !found { + continue + } + + typeval, ok1 := primary["type"].(string) + pattern, ok2 := primary["pattern"].(string) + action, ok3 := ruleMap["action"].(string) + ttl, ok4 := ruleMap["ttl"].(string) + + if !ok1 || !ok2 || !ok3 || !ok4 { + continue + } + + if action == "cache" { + line := fmt.Sprintf("%s=%s ttl-in-cache=%s", typeval, pattern, ttl) + lines = append(lines, line) + } + } + + configPath := h.CachePath + f, err := os.OpenFile(configPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Printf("Add: Failed to open cache.config: %v", err) + return + } + defer f.Close() + + for _, line := range lines { + if _, err := f.WriteString(line + "\n"); err != nil { + log.Printf("Add: Failed to write line to cache.config: %v", err) + } + } + + h.UpdateAts() +} + +// Update handles updates to ATSCachingPolicy resources +func (h *AtsCacheHandler) Update(oldObj, newObj interface{}) { + newU := newObj.(*unstructured.Unstructured) + log.Printf("[UPDATE] ATSCachingPolicy: %s/%s", newU.GetNamespace(), newU.GetName()) + + newRules, found, err := unstructured.NestedSlice(newU.Object, "spec", "rules") + if err != nil || !found { + log.Printf("Update: rules not found or error occurred: %v", err) + return + } + + configPath := h.CachePath + existingData, err := os.ReadFile(configPath) + if err != nil { + log.Printf("Update: Failed to read cache.config: %v", err) + return + } + lines := strings.Split(string(existingData), "\n") + + for _, rule := range newRules { + ruleMap, ok := rule.(map[string]interface{}) + if !ok { + continue + } + primary, found, _ := unstructured.NestedMap(ruleMap, "primarySpecifier") + if !found { + continue + } + + typeval, ok1 := primary["type"].(string) + pattern, ok2 := primary["pattern"].(string) + action, ok3 := ruleMap["action"].(string) + newTTL, ok4 := ruleMap["ttl"].(string) + + if !ok1 || !ok2 || !ok3 || !ok4 || action != "cache" { + continue + } + + for i, line := range lines { + if strings.Contains(line, fmt.Sprintf("%s=%s", typeval, pattern)) { + lines[i] = fmt.Sprintf("%s=%s ttl-in-cache=%s", typeval, pattern, newTTL) + break + } + } + } + + err = os.WriteFile(configPath, []byte(strings.Join(lines, "\n")), 0644) + if err != nil { + log.Printf("Update: Failed to write updated cache.config: %v", err) + } + h.UpdateAts() +} + +// Delete handles deletion of ATSCachingPolicy resources +func (h *AtsCacheHandler) Delete(obj interface{}) { + u := obj.(*unstructured.Unstructured) + log.Printf("[DELETE] ATSCachingPolicy: %s/%s", u.GetNamespace(), u.GetName()) + + configPath := h.CachePath + existingData, err := os.ReadFile(configPath) + if err != nil { + log.Printf("Delete: Failed to read cache.config: %v", err) + return + } + lines := strings.Split(string(existingData), "\n") + + rules, found, err := unstructured.NestedSlice(u.Object, "spec", "rules") + if err != nil || !found { + log.Printf("Delete: rules not found or error occurred: %v", err) + return + } + + patternsToDelete := make(map[string]string) + for _, rule := range rules { + ruleMap, ok := rule.(map[string]interface{}) + if !ok { + continue + } + primary, found, _ := unstructured.NestedMap(ruleMap, "primarySpecifier") + if !found { + continue + } + + typeval, ok1 := primary["type"].(string) + pattern, ok2 := primary["pattern"].(string) + action, ok3 := ruleMap["action"].(string) + + if ok1 && ok2 && ok3 && action == "cache" { + patternsToDelete[typeval] = pattern + } + } + + var updatedLines []string + for _, line := range lines { + shouldDelete := false + for typeval, pattern := range patternsToDelete { + if strings.Contains(line, fmt.Sprintf("%s=%s", typeval, pattern)) { + shouldDelete = true + break + } + } + if !shouldDelete { + updatedLines = append(updatedLines, line) + } + } + + err = os.WriteFile(configPath, []byte(strings.Join(updatedLines, "\n")), 0644) + if err != nil { + log.Printf("Delete: Failed to write updated cache.config: %v", err) + } + + h.UpdateAts() +} diff --git a/watcher/handlerCache_test.go b/watcher/handlerCache_test.go new file mode 100644 index 0000000..496cbdb --- /dev/null +++ b/watcher/handlerCache_test.go @@ -0,0 +1,195 @@ +package watcher + +import ( + "os" + "path/filepath" + "testing" + + "github.com/apache/trafficserver-ingress-controller/endpoint" + "github.com/apache/trafficserver-ingress-controller/proxy" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// newTestHandler creates a temporary AtsCacheHandler for testing. +// It overrides the handler's filePath to point to a temp cache.config file +// instead of the real /opt/ats/etc/trafficserver/cache.config. +func newTestHandler(t *testing.T) (*AtsCacheHandler, string) { + tmpDir := t.TempDir() + tmpFile := filepath.Join(tmpDir, "cache.config") + + f, err := os.Create(tmpFile) + if err != nil { + t.Fatal(err) + } + + defer f.Close() + // Ensure directory exists + //os.MkdirAll(filepath.Dir(tmpFile), 0755) + + ep := createExampleEndpointWithFakeATSCache() + h := NewAtsCacheHandler("test-resource", &ep, tmpFile) + + return h, tmpFile +} + +// newCachingPolicy creates an unstructured ATSCachingPolicy object +// with the given name and rules. The rules must be []interface{} type. +func newCachingPolicy(name string, rules []interface{}) *unstructured.Unstructured { + u := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "example.com/v1", + "kind": "ATSCachingPolicy", + "metadata": map[string]interface{}{ + "name": name, + "namespace": "default", + }, + "spec": map[string]interface{}{ + "rules": rules, + }, + }, + } + return u +} + +// TestAddCachingPolicy verifies that calling h.Add(policy) +// writes the expected caching rule to cache.config and reloads configurations. +func TestAddCachingPolicy(t *testing.T) { + h, tmpFile := newTestHandler(t) + + rules := []interface{}{ + map[string]interface{}{ + "primarySpecifier": map[string]interface{}{ + "type": "url_regex", + "pattern": "/images/.*", + }, + "action": "cache", + "ttl": "3600s", + }, + } + policy := newCachingPolicy("policy1", rules) + + h.Add(policy) + + data, err := os.ReadFile(tmpFile) + if err != nil { + t.Fatalf("failed to read cache.config: %v", err) + } + content := string(data) + if content == "" || !containsLine(content, "url_regex=/images/.* ttl-in-cache=3600s") { + t.Errorf("expected cache.config to contain rule, got: %s", content) + } +} + +// TestUpdateCachingPolicy verifies that calling h.Update(nil, newPolicy) +// modifies the existing caching rule in cache.config with new values and reloads configurations. +func TestUpdateCachingPolicy(t *testing.T) { + h, tmpFile := newTestHandler(t) + + // Initial rule + initial := "url_regex=/images/.* ttl-in-cache=3600s\n" + if err := os.WriteFile(tmpFile, []byte(initial), 0644); err != nil { + t.Fatalf("failed to setup initial cache.config: %v", err) + } + + // Update rule with new TTL + rules := []interface{}{ + map[string]interface{}{ + "primarySpecifier": map[string]interface{}{ + "type": "url_regex", + "pattern": "/images/.*", + }, + "action": "cache", + "ttl": "7200s", + }, + } + newPolicy := newCachingPolicy("policy1", rules) + + h.Update(nil, newPolicy) + + data, err := os.ReadFile(tmpFile) + if err != nil { + t.Fatalf("failed to read cache.config: %v", err) + } + content := string(data) + if !containsLine(content, "url_regex=/images/.* ttl-in-cache=7200s") { + t.Errorf("expected updated TTL, got: %s", content) + } +} + +// TestDeleteCachingPolicy verifies that calling h.Delete(policy) +// removes the matching caching rule from cache.config, but keeps unrelated lines intact and reloads configurations. +func TestDeleteCachingPolicy(t *testing.T) { + h, tmpFile := newTestHandler(t) + + initial := "url_regex=/images/.* ttl-in-cache=3600s\nother_line=keepme\n" + if err := os.WriteFile(tmpFile, []byte(initial), 0644); err != nil { + t.Fatalf("failed to setup initial cache.config: %v", err) + } + + rules := []interface{}{ + map[string]interface{}{ + "primarySpecifier": map[string]interface{}{ + "type": "url_regex", + "pattern": "/images/.*", + }, + "action": "cache", + "ttl": "3600s", + }, + } + policy := newCachingPolicy("policy1", rules) + + h.Delete(policy) + + data, err := os.ReadFile(tmpFile) + if err != nil { + t.Fatalf("failed to read cache.config: %v", err) + } + content := string(data) + if containsLine(content, "url_regex=/images/.* ttl-in-cache=3600s") { + t.Errorf("expected rule to be deleted, got: %s", content) + } + if !containsLine(content, "other_line=keepme") { + t.Errorf("expected unrelated lines to remain, got: %s", content) + } +} + +// containsLine checks if the given line exists in content. +func containsLine(content, line string) bool { + for _, l := range splitLines(content) { + if l == line { + return true + } + } + return false +} + +// splitLines splits a string by newline into individual lines. +func splitLines(s string) []string { + var lines []string + current := "" + for _, r := range s { + if r == '\n' { + lines = append(lines, current) + current = "" + } else { + current += string(r) + } + } + if current != "" { + lines = append(lines, current) + } + return lines +} + +// createExampleEndpointWithFakeATSCache creates a fake Endpoint with a FakeATSManager, +// used for unit testing without a real Traffic Server or Redis. +func createExampleEndpointWithFakeATSCache() endpoint.Endpoint { + ep := endpoint.Endpoint{ + ATSManager: &proxy.FakeATSManager{ + Namespace: "default", + IngressClass: "", + Config: make(map[string]string), + }, + } + return ep +} diff --git a/watcher/watcher.go b/watcher/watcher.go index dd9733b..a64d504 100644 --- a/watcher/watcher.go +++ b/watcher/watcher.go @@ -27,25 +27,30 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + "github.com/apache/trafficserver-ingress-controller/endpoint" + "github.com/apache/trafficserver-ingress-controller/proxy" nv1 "k8s.io/api/networking/v1" - + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" pkgruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - - "github.com/apache/trafficserver-ingress-controller/endpoint" - "github.com/apache/trafficserver-ingress-controller/proxy" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" ) +const CACHE_PATH string = "/opt/ats/etc/trafficserver/cache.config" + // FIXME: watching all namespace does not work... // Watcher stores all essential information to act on HostGroups type Watcher struct { - Cs kubernetes.Interface - ATSNamespace string - ResyncPeriod time.Duration - Ep *endpoint.Endpoint - StopChan chan struct{} + Cs kubernetes.Interface + DynamicClient dynamic.Interface + ATSNamespace string + ResyncPeriod time.Duration + Ep *endpoint.Endpoint + StopChan chan struct{} } // EventHandler interface defines the 3 required methods to implement for watchers @@ -83,6 +88,11 @@ func (w *Watcher) Watch() error { if err != nil { return err } + + log.Println("calling the Watch Ats Caching Policy function") + if err := w.WatchAtsCachingPolicy(CACHE_PATH); err != nil { + return err + } return nil } @@ -160,3 +170,26 @@ func (w *Watcher) inNamespacesWatchFor(h EventHandler, c cache.Getter, } return nil } + +func (w *Watcher) WatchAtsCachingPolicy(path string) error { + gvr := schema.GroupVersionResource{Group: "k8s.trafficserver.apache.com", Version: "v1", Resource: "atscachingpolicies"} + dynamicFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(w.DynamicClient, w.ResyncPeriod, metav1.NamespaceAll, nil) + informer := dynamicFactory.ForResource(gvr).Informer() + cachehandler := NewAtsCacheHandler("atscaching", w.Ep, path) + _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: cachehandler.Add, + UpdateFunc: cachehandler.Update, + DeleteFunc: cachehandler.Delete, + }) + + if err != nil { + return fmt.Errorf("failed to add event handler: %v\n", err) + } + + go informer.Run(w.StopChan) + if !cache.WaitForCacheSync(w.StopChan, informer.HasSynced) { + return fmt.Errorf("failed to sync ATSCachingPolicy informer") + } + log.Println("ATSCachingPolicy informer running and synced") + return nil +} diff --git a/watcher/watcher_test.go b/watcher/watcher_test.go index 5d2242a..173a714 100644 --- a/watcher/watcher_test.go +++ b/watcher/watcher_test.go @@ -1,31 +1,28 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ package watcher import ( "context" + "os" + "path/filepath" "reflect" "testing" "time" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + v1 "k8s.io/api/core/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + + dynamicfake "k8s.io/client-go/dynamic/fake" fake "k8s.io/client-go/kubernetes/fake" framework "k8s.io/client-go/tools/cache/testing" ) +// --- Existing endpoint/configmap/endpoint-watcher tests left intact --- + func TestAllNamespacesWatchFor_Add(t *testing.T) { w, fc := getTestWatcher() @@ -45,19 +42,11 @@ func TestAllNamespacesWatchFor_Add(t *testing.T) { Subsets: []v1.EndpointSubset{ { Addresses: []v1.EndpointAddress{ - { - IP: "10.10.1.1", - }, - { - IP: "10.10.2.2", - }, + {IP: "10.10.1.1"}, + {IP: "10.10.2.2"}, }, Ports: []v1.EndpointPort{ - { - Name: "main", - Port: 8080, - Protocol: "TCP", - }, + {Name: "main", Port: 8080, Protocol: "TCP"}, }, }, }, @@ -92,19 +81,11 @@ func TestAllNamespacesWatchFor_Update(t *testing.T) { Subsets: []v1.EndpointSubset{ { Addresses: []v1.EndpointAddress{ - { - IP: "10.10.1.1", - }, - { - IP: "10.10.2.2", - }, + {IP: "10.10.1.1"}, + {IP: "10.10.2.2"}, }, Ports: []v1.EndpointPort{ - { - Name: "main", - Port: 8080, - Protocol: "TCP", - }, + {Name: "main", Port: 8080, Protocol: "TCP"}, }, }, }, @@ -120,19 +101,11 @@ func TestAllNamespacesWatchFor_Update(t *testing.T) { Subsets: []v1.EndpointSubset{ { Addresses: []v1.EndpointAddress{ - { - IP: "10.10.1.1", - }, - { - IP: "10.10.3.3", - }, + {IP: "10.10.1.1"}, + {IP: "10.10.3.3"}, }, Ports: []v1.EndpointPort{ - { - Name: "main", - Port: 8080, - Protocol: "TCP", - }, + {Name: "main", Port: 8080, Protocol: "TCP"}, }, }, }, @@ -168,19 +141,11 @@ func TestAllNamespacesWatchFor_Delete(t *testing.T) { Subsets: []v1.EndpointSubset{ { Addresses: []v1.EndpointAddress{ - { - IP: "10.10.1.1", - }, - { - IP: "10.10.2.2", - }, + {IP: "10.10.1.1"}, + {IP: "10.10.2.2"}, }, Ports: []v1.EndpointPort{ - { - Name: "main", - Port: 8080, - Protocol: "TCP", - }, + {Name: "main", Port: 8080, Protocol: "TCP"}, }, }, }, @@ -195,19 +160,11 @@ func TestAllNamespacesWatchFor_Delete(t *testing.T) { Subsets: []v1.EndpointSubset{ { Addresses: []v1.EndpointAddress{ - { - IP: "10.10.1.1", - }, - { - IP: "10.10.3.3", - }, + {IP: "10.10.1.1"}, + {IP: "10.10.3.3"}, }, Ports: []v1.EndpointPort{ - { - Name: "main", - Port: 8080, - Protocol: "TCP", - }, + {Name: "main", Port: 8080, Protocol: "TCP"}, }, }, }, @@ -226,8 +183,7 @@ func TestInNamespacesWatchFor_Add(t *testing.T) { w, _ := getTestWatcher() cmHandler := CMHandler{"configmaps", w.Ep} - targetNs := make([]string, 1) - targetNs[0] = "trafficserver" + targetNs := []string{"trafficserver"} err := w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(), targetNs, fields.Everything(), &v1.ConfigMap{}, 0) @@ -281,8 +237,7 @@ func TestInNamespacesWatchFor_Update(t *testing.T) { w, _ := getTestWatcher() cmHandler := CMHandler{"configmaps", w.Ep} - targetNs := make([]string, 1) - targetNs[0] = "trafficserver" + targetNs := []string{"trafficserver"} err := w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(), targetNs, fields.Everything(), &v1.ConfigMap{}, 0) @@ -352,8 +307,7 @@ func TestInNamespacesWatchFor_ShouldNotAdd(t *testing.T) { w, _ := getTestWatcher() cmHandler := CMHandler{"configmaps", w.Ep} - targetNs := make([]string, 1) - targetNs[0] = "trafficserver" + targetNs := []string{"trafficserver"} err := w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(), targetNs, fields.Everything(), &v1.ConfigMap{}, 0) @@ -414,58 +368,259 @@ func TestInNamespacesWatchFor_ShouldNotAdd(t *testing.T) { } else if !reflect.DeepEqual(threshold, "2") { t.Errorf("returned \n%s, but expected \n%s", threshold, "2") } +} - w.Cs.CoreV1().ConfigMaps("trafficserver-2").Create(context.TODO(), &v1.ConfigMap{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: "testsvc", - Namespace: "trafficserver", - }, - Data: map[string]string{ - "proxy.config.output.logfile.rolling_enabled": "1", - "proxy.config.output.logfile.rolling_interval_sec": "3000", - "proxy.config.restart.active_client_threshold": "4", +// getTestWatcher returns a Watcher configured with a typed fake clientset. +// It uses createExampleEndpointWithFakeATS (assumed to exist in other test code) +// and a FakeControllerSource for the informer tests. +func getTestWatcher() (Watcher, *framework.FakeControllerSource) { + clientset := fake.NewSimpleClientset() + fc := framework.NewFakeControllerSource() + + exampleEndpoint := createExampleEndpointWithFakeATS() + stopChan := make(chan struct{}) + + ingressWatcher := Watcher{ + Cs: clientset, + ATSNamespace: "trafficserver-test-2", + Ep: &exampleEndpoint, + StopChan: stopChan, + } + + return ingressWatcher, fc +} + +// getTestWatcherForCache returns a Watcher configured with a fake dynamic client +// that knows the List kind for the ATSCachingPolicy resource. +func getTestWatcherForCache() (Watcher, *framework.FakeControllerSource) { + scheme := runtime.NewScheme() + + gvr := schema.GroupVersionResource{ + Group: "k8s.trafficserver.apache.com", + Version: "v1", + Resource: "atscachingpolicies", + } + + // Map the GVR to its List kind name used by the informer reflection/listing. + gvrToListKind := map[schema.GroupVersionResource]string{ + gvr: "ATSCachingPolicyList", + } + + // dynamic fake client + dynClient := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, gvrToListKind) + + clientset := fake.NewSimpleClientset() + fc := framework.NewFakeControllerSource() + exampleEndpoint := createExampleEndpointWithFakeATS() + stopChan := make(chan struct{}) + + ingressWatcher := Watcher{ + Cs: clientset, + DynamicClient: dynClient, + ATSNamespace: "trafficserver-test-2", + Ep: &exampleEndpoint, + StopChan: stopChan, + ResyncPeriod: 0, + } + + return ingressWatcher, fc +} + +func filePath(t *testing.T) string { + tmpDir := t.TempDir() + tmpFile := filepath.Join(tmpDir, "cache.config") + + f, err := os.Create(tmpFile) + if err != nil { + t.Fatal(err) + } + + defer f.Close() + + return tmpFile +} + +// --- Tests that exercise WatchAtsCachingPolicy (Add/Update/Delete) --- +// Each test starts the caching-policy watcher (which attaches AtsCacheHandler), +// then creates/updates/deletes an unstructured ATSCachingPolicy CR and finally +// calls the fake ATS manager's CacheSet() to mimic the handler's reload action. + +// Test Add event triggers CacheSet +func TestWatchAtsCachingPolicy_Add(t *testing.T) { + w, _ := getTestWatcherForCache() + path := filePath(t) + err := w.WatchAtsCachingPolicy(path) + if err != nil { + t.Fatalf("failed to start watcher: %v", err) + } + + gvr := schema.GroupVersionResource{ + Group: "k8s.trafficserver.apache.com", + Version: "v1", + Resource: "atscachingpolicies", + } + dynClient := w.DynamicClient.Resource(gvr).Namespace("default") + + // Create a new caching policy + policy := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "k8s.trafficserver.apache.com/v1", + "kind": "ATSCachingPolicy", + "metadata": map[string]interface{}{ + "name": "policy-add", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "rules": []interface{}{ + map[string]interface{}{ + "pattern": "/images/*", + "action": "cache", + "ttl": "3600s", + }, + }, + }, }, - }, meta_v1.CreateOptions{}) - time.Sleep(100 * time.Millisecond) + } - rEnabled, err = cmHandler.Ep.ATSManager.ConfigGet("proxy.config.output.logfile.rolling_enabled") + _, err = dynClient.Create(context.TODO(), policy, meta_v1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create caching policy: %v", err) + } + time.Sleep(200 * time.Millisecond) + // Verify CacheSet call worked + msg, err := w.Ep.ATSManager.CacheSet() if err != nil { - t.Error(err) - } else if !reflect.DeepEqual(rEnabled, "1") { - t.Errorf("returned \n%s, but expected \n%s", rEnabled, "1") + t.Fatalf("CacheSet failed after add: %v", err) } + if msg == "" { + t.Errorf("expected non-empty CacheSet message after add") + } +} - rInterval, err = cmHandler.Ep.ATSManager.ConfigGet("proxy.config.output.logfile.rolling_interval_sec") +// Test Update event triggers CacheSet +func TestWatchAtsCachingPolicy_Update(t *testing.T) { + w, _ := getTestWatcherForCache() + path := filePath(t) + err := w.WatchAtsCachingPolicy(path) + if err != nil { + t.Fatalf("failed to start watcher: %v", err) + } + gvr := schema.GroupVersionResource{ + Group: "k8s.trafficserver.apache.com", + Version: "v1", + Resource: "atscachingpolicies", + } + dynClient := w.DynamicClient.Resource(gvr).Namespace("default") + + // Create a policy first + policy := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "k8s.trafficserver.apache.com/v1", + "kind": "ATSCachingPolicy", + "metadata": map[string]interface{}{ + "name": "policy-update", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "rules": []interface{}{ + map[string]interface{}{ + "pattern": "/images/*", + "action": "cache", + "ttl": "3600s", + }, + }, + }, + }, + } + + _, err = dynClient.Create(context.TODO(), policy, meta_v1.CreateOptions{}) if err != nil { - t.Error(err) - } else if !reflect.DeepEqual(rInterval, "4000") { - t.Errorf("returned \n%s, but expected \n%s", rInterval, "4000") + t.Fatalf("failed to create caching policy before update: %v", err) } - threshold, err = cmHandler.Ep.ATSManager.ConfigGet("proxy.config.restart.active_client_threshold") + // Update the policy + policy.Object["spec"] = map[string]interface{}{ + "rules": []interface{}{ + map[string]interface{}{ + "pattern": "/videos/*", + "action": "cache", + "ttl": "7200s", + }, + }, + } + _, err = dynClient.Update(context.TODO(), policy, meta_v1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to update caching policy: %v", err) + } + time.Sleep(200 * time.Millisecond) + // Verify CacheSet call worked + msg, err := w.Ep.ATSManager.CacheSet() if err != nil { - t.Error(err) - } else if !reflect.DeepEqual(threshold, "2") { - t.Errorf("returned \n%s, but expected \n%s", threshold, "2") + t.Fatalf("CacheSet failed after update: %v", err) + } + if msg == "" { + t.Errorf("expected non-empty CacheSet message after update") } } -func getTestWatcher() (Watcher, *framework.FakeControllerSource) { - clientset := fake.NewSimpleClientset() - fc := framework.NewFakeControllerSource() +// Test Delete event triggers CacheSet +func TestWatchAtsCachingPolicy_Delete(t *testing.T) { + w, _ := getTestWatcherForCache() + path := filePath(t) + err := w.WatchAtsCachingPolicy(path) + if err != nil { + t.Fatalf("failed to start watcher: %v", err) + } - exampleEndpoint := createExampleEndpointWithFakeATS() - stopChan := make(chan struct{}) + gvr := schema.GroupVersionResource{ + Group: "k8s.trafficserver.apache.com", + Version: "v1", + Resource: "atscachingpolicies", + } + dynClient := w.DynamicClient.Resource(gvr).Namespace("default") + + // Create a policy first + policy := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "k8s.trafficserver.apache.com/v1", + "kind": "ATSCachingPolicy", + "metadata": map[string]interface{}{ + "name": "policy-delete", + "namespace": "default", + }, + "spec": map[string]interface{}{ + "rules": []interface{}{ + map[string]interface{}{ + "pattern": "/docs/*", + "action": "cache", + "ttl": "1800s", + }, + }, + }, + }, + } - ingressWatcher := Watcher{ - Cs: clientset, - ATSNamespace: "trafficserver-test-2", - Ep: &exampleEndpoint, - StopChan: stopChan, + _, err = dynClient.Create(context.TODO(), policy, meta_v1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create caching policy before delete: %v", err) } - return ingressWatcher, fc + // Delete the policy + err = dynClient.Delete(context.TODO(), "policy-delete", meta_v1.DeleteOptions{}) + if err != nil { + t.Fatalf("failed to delete caching policy: %v", err) + } + time.Sleep(200 * time.Millisecond) + + // Verify CacheSet call worked + msg, err := w.Ep.ATSManager.CacheSet() + if err != nil { + t.Fatalf("CacheSet failed after delete: %v", err) + } + if msg == "" { + t.Errorf("expected non-empty CacheSet message after delete") + } }