This is an automated email from the ASF dual-hosted git repository.
xuetaoli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new 368b0706 Feature/support opa server (#822)
368b0706 is described below
commit 368b07063b25f9d44536ce4cc7f16b72d60029b9
Author: dubbo-go-bot <[email protected]>
AuthorDate: Wed Dec 17 18:06:25 2025 +0800
Feature/support opa server (#822)
* Feature/support opa server
(https://github.com/dubbo-go-pixiu/dubbo-go-pixiu/pull/43)
* support http filter plugins (Open Policy Agent)
* fmt
* Streamlined judgment
* accept the advices
* add wrong example
* style: run go fmt && imports-formatter
* fix the bug of go.mod
* change the key of opa
* update the true sum
* Add OPA DOC
* change the opa.docs for Chinese and English version
* remove docs/sample
* roll back
* modify the README
* modify
* unify the errorcode
* add the test of context
* simplify
* use errors.new to replace fmt.Errof in some cases
* fix api_config
* fix call
* fix jwt
* change interface{} to any
* change all interface{} to any
* fix the logic error in filter\prometheus\metric
* add judgment for client error
* reliable timeout detection over substring matching
* add mock test
* Remove send and sendError to avoid confusion
* Unify the errorcode (#14)
* support http filter plugins (Open Policy Agent)
* fmt
* Streamlined judgment
* accept the advices
* add wrong example
* style: run go fmt && imports-formatter
* fix the bug of go.mod
* change the key of opa
* update the true sum
* Add OPA DOC
* change the opa.docs for Chinese and English version
* remove docs/sample
* roll back
* modify the README
* modify
* unify the errorcode
* add the test of context
* simplify
* use errors.new to replace fmt.Errof in some cases
* fix api_config
* fix call
* fix jwt
* change interface{} to any
* change all interface{} to any
* fix the logic error in filter\prometheus\metric
* add judgment for client error
* reliable timeout detection over substring matching
* add mock test
* Remove send and sendError to avoid confusion
---------
Co-authored-by: Sirui Huang <[email protected]>
* the first version of metricreport
* simplify the metricreporter
* add docs
* chore: fix gofmt formatting issues
* delete nots
* return the change to improve consistent
* upadate the docs
* fix
* improve the use of lock in content
* unify the metric of pull and push
* fix the bug in integration test, prepare for replace
* replace metric, rename the metricrepoter to metric
* avoid the Factory's config may be updated by FilterManage
* update the docs and movew the default to constant
* supprot opa
* Update pkg/common/constant/filter.go
* feat: replace the metric filter by a unified metric filter
(dubbo-go-pixiu/dubbo-go-pixiu/pull/15)
* support http filter plugins (Open Policy Agent)
* fmt
* Streamlined judgment
* accept the advices
* add wrong example
* style: run go fmt && imports-formatter
* fix the bug of go.mod
* change the key of opa
* update the true sum
* Add OPA DOC
* change the opa.docs for Chinese and English version
* remove docs/sample
* roll back
* modify the README
* modify
* unify the errorcode
* add the test of context
* simplify
* use errors.new to replace fmt.Errof in some cases
* fix api_config
* fix call
* fix jwt
* change interface{} to any
* change all interface{} to any
* fix the logic error in filter\prometheus\metric
* add judgment for client error
* reliable timeout detection over substring matching
* add mock test
* Remove send and sendError to avoid confusion
* Unify the errorcode (#14)
* support http filter plugins (Open Policy Agent)
* fmt
* Streamlined judgment
* accept the advices
* add wrong example
* style: run go fmt && imports-formatter
* fix the bug of go.mod
* change the key of opa
* update the true sum
* Add OPA DOC
* change the opa.docs for Chinese and English version
* remove docs/sample
* roll back
* modify the README
* modify
* unify the errorcode
* add the test of context
* simplify
* use errors.new to replace fmt.Errof in some cases
* fix api_config
* fix call
* fix jwt
* change interface{} to any
* change all interface{} to any
* fix the logic error in filter\prometheus\metric
* add judgment for client error
* reliable timeout detection over substring matching
* add mock test
* Remove send and sendError to avoid confusion
---------
Co-authored-by: Sirui Huang <[email protected]>
* the first version of metricreport
* simplify the metricreporter
* add docs
* chore: fix gofmt formatting issues
* delete nots
* return the change to improve consistent
* upadate the docs
* fix
* improve the use of lock in content
* unify the metric of pull and push
* fix the bug in integration test, prepare for replace
* replace metric, rename the metricrepoter to metric
* avoid the Factory's config may be updated by FilterManage
* update the docs and movew the default to constant
* Update pkg/common/constant/filter.go
---------
Co-authored-by: Sirui Huang <[email protected]>
Co-authored-by: Xuetao Li <[email protected]>
* fix the ci of constant/filter
* fix the ci of constant/filter
* replace the old defaultName
* merge opa
* fmt
* fix(filter): deep copy complex filter configs
(https://github.com/dubbo-go-pixiu/dubbo-go-pixiu/pull/38)
Some Pixiu filters have complex configurations (maps, slices, nested
structs). Passing the factory.cfg pointer directly can cause in-flight requests
to read inconsistent or modified configs when FilterManager.ReLoad() updates
factory instances at runtime, violating the HttpFilterFactory interface
contract.
This commit implements DeepCopy() for complex filter configs and updates
PrepareFilterChain to use it. Each filter instance now has its own independent
config, preventing runtime inconsistencies.
Affected files:
- pkg/filter/auth/jwt/jwt.go
- pkg/filter/authority/authority.go
- pkg/filter/authority/config.go
- pkg/filter/cors/cors.go
- pkg/filter/csrf/csrf.go
- pkg/filter/event/event.go
- pkg/filter/failinject/filter.go
- pkg/filter/http/grpcproxy/grpc.go
- pkg/filter/mcp/mcpserver/filter.go
- pkg/filter/sentinel/circuitbreaker/circuit_breaker.go
- pkg/filter/sentinel/config.go
- pkg/filter/sentinel/ratelimit/rate_limit.go
- pkg/model/mcpserver.go
Signed-off-by: Aetherance <[email protected]>
Co-authored-by: Aetherance <[email protected]>
* fix(filter): shallow copy factory config to avoid pointer sharing
(https://github.com/dubbo-go-pixiu/dubbo-go-pixiu/pull/35)
Shallow copy the factory configuration in filters that previously passed
factory pointers directly. This ensures compliance with the
HttpFilterFactory
interface contract and avoids accidental sharing of config between filters
and the factory.
Affected filters (shallow copy sufficient):
- opa/opa.go
- llm/tokenizer/tokenizer.go
- http/loadbalancer/loadbalancer.go
- host/host.go
- header/header.go
- prometheus/metric
- accesslog/access_log.go
Note: Although prometheus/metric.go uses a nested struct
(MetricCollectConfiguration -> MetricCollectRule), all fields are value
types
(string, bool, int). Therefore, shallow copy is sufficient; deep copy is not
required.
Signed-off-by: Aetherance <[email protected]>
Co-authored-by: Aetherance <[email protected]>
* change the note
---------
Signed-off-by: Aetherance <[email protected]>
Co-authored-by: Sirui Huang <[email protected]>
Co-authored-by: Xuetao Li <[email protected]>
Co-authored-by: The_INK <[email protected]>
Co-authored-by: Aetherance <[email protected]>
* Update pkg/filter/opa/opa.go
* Apply suggestion from @Alanxtl
---------
Signed-off-by: Aetherance <[email protected]>
Co-authored-by: Sirui Huang <[email protected]>
Co-authored-by: Sirui Huang <[email protected]>
Co-authored-by: Xuetao Li <[email protected]>
Co-authored-by: The_INK <[email protected]>
Co-authored-by: Aetherance <[email protected]>
Co-authored-by: Xuetao Li <[email protected]>
---
docs/user/filter/opa.md | 316 +++++++++++++++++++++++++++++--
docs/user/filter/opa_CN.md | 314 +++++++++++++++++++++++++++++--
pkg/filter/opa/opa.go | 275 +++++++++++++++++++++++----
pkg/filter/opa/opa_test.go | 460 ++++++++++++++++++++++++++++++++++++++++++++-
4 files changed, 1296 insertions(+), 69 deletions(-)
diff --git a/docs/user/filter/opa.md b/docs/user/filter/opa.md
index a72486b5..b4550293 100644
--- a/docs/user/filter/opa.md
+++ b/docs/user/filter/opa.md
@@ -7,18 +7,58 @@ English | [中文](opa_CN.md)
## English
### Overview
-The `dgp.filter.http.opa` filter delegates authorization decisions to Open
Policy Agent (OPA) via a Rego policy. This filter evaluates requests and
determines whether to allow or deny based on the policy defined in Rego. The
policy is provided as an inline Rego module and evaluated using OPA's built-in
query engine.
+The `dgp.filter.http.opa` filter delegates authorization decisions to Open
Policy Agent (OPA) via a Rego policy. This filter evaluates requests and
determines whether to allow or deny based on the policy defined in Rego.
-### What the filter does (current behavior)
+The filter supports two operation modes:
+- **Server Mode (Recommended)**: Evaluates policies by calling an external OPA
server via HTTP API, supporting centralized policy management and hot updates.
+- **Embedded Mode**: Policies are provided as inline Rego modules and
evaluated using OPA's built-in query engine.
+
+### Operation Modes
+
+#### Server Mode (Recommended)
+- Calls an external OPA server via REST API (`server_url`).
+- Supports centralized policy management and dynamic updates without
restarting Pixiu.
+- Suitable for production environments and large-scale deployments.
+- Simple configuration - just specify OPA server address and decision path.
+
+#### Embedded Mode (Backward Compatible)
- Loads a Rego **module string** from `config.policy`.
- Builds a Rego **query** from `config.entrypoint`.
-- For each incoming request, constructs an `input` object and evaluates the
query.
-- If the query result is `true`, the request is allowed. Otherwise, the
request is denied.
+- Policies are evaluated within the Pixiu process; updates require restart.
+- Suitable for simple scenarios or test environments.
-> There is **no built-in support** for external policy files or URIs, custom
HTTP status codes, or custom error bodies.
+> **Automatic Mode Selection**:
+> - If `server_url` is configured, **Server Mode** is used (priority).
+> - If `server_url` is not configured but `policy` is provided, **Embedded
Mode** is used.
### Configuration schema
-Add the filter under your HTTP connection manager’s `http_filters` list.
+
+#### Server Mode Configuration (Recommended)
+
+```yaml
+filters:
+ - name: dgp.filter.httpconnectionmanager
+ config:
+ route_config:
+ # ... your routes
+ http_filters:
+ - name: dgp.filter.http.opa
+ config:
+ # OPA server address
+ server_url: "http://opa-server:8181"
+ # Decision path (OPA REST API path)
+ decision_path: "/v1/data/http/authz/allow"
+ # Request timeout in milliseconds, default 100ms
+ timeout_ms: 100
+ # Optional: Bearer token authentication
+ # bearer_token: "your-secret-token"
+ # HTTP proxy filter should be after OPA filter
+ - name: dgp.filter.http.proxy
+ config:
+ # ... proxy config
+```
+
+#### Embedded Mode Configuration (Backward Compatible)
```yaml
filters:
@@ -42,16 +82,40 @@ filters:
# HTTP proxy filter should be after OPA filter
- name: dgp.filter.http.proxy
config:
- # ... proxy config
+ # ... proxy config
```
#### Fields
-- **`policy`** *(string, required)*
+**Server Mode Fields:**
+
+- **`server_url`** *(string, required for server mode)*
+ - **Meaning:** OPA server address, e.g., `http://opa-server:8181` or
`https://opa.example.com:8181`.
+ - **Datatype:** `string`.
+ - **Example:** `"http://localhost:8181"`.
+
+- **`decision_path`** *(string, required for server mode)*
+ - **Meaning:** OPA REST API decision path, format:
`/v1/data/<package>/<rule>`.
+ - **Datatype:** `string`.
+ - **Example:** `"/v1/data/http/authz/allow"`.
+
+- **`timeout_ms`** *(integer, optional)*
+ - **Meaning:** Request timeout for OPA server in milliseconds.
+ - **Datatype:** `int`.
+ - **Default:** `100` (100 milliseconds).
+
+- **`bearer_token`** *(string, optional)*
+ - **Meaning:** Bearer token for OPA server authentication.
+ - **Datatype:** `string`.
+ - **Notes:** Configure this field if OPA server requires authentication.
+
+**Embedded Mode Fields (Backward Compatible):**
+
+- **`policy`** *(string, required for embedded mode)*
- **Meaning:** The **Rego module source code** (inline string). Loaded via
`rego.Module("policy.rego", policy)`.
- **Datatype:** `string` (multiline YAML recommended with `|`).
- - **Notes:** File paths or bundle URIs are **not supported**.
-- **`entrypoint`** *(string, required)*
+
+- **`entrypoint`** *(string, required for embedded mode)*
- **Meaning:** The **Rego query string** passed to `rego.Query(...)`. Should
be a valid query like `data.<package>.<rule>` (e.g., `data.http.authz.allow`).
- **Datatype:** `string`.
@@ -78,9 +142,170 @@ input.api # API object (opaque)
input.params # route params map
```
+**Important Note: HTTP Header Canonicalization**
+
+Go's `net/http` package automatically canonicalizes HTTP header names to
**Title-Case** format (e.g., `X-Api-Key`, `Content-Type`). When accessing
headers in your policy, use the canonicalized key names:
+
+- ✅ Correct: `input.headers["X-Api-Key"]` or `input.headers["Content-Type"]`
+- ❌ Incorrect: `input.headers["x-api-key"]` or `input.headers["x_api_key"]`
+
+**Example:**
+```rego
+# Correct header access
+allow {
+ input.headers["X-Api-Key"][0] == "secret"
+ input.headers["Content-Type"][0] == "application/json"
+}
+```
+
+### OPA Server Deployment (Server Mode)
+
+#### Docker Deployment
+
+**1. Start OPA Server**
+
+```bash
+docker run -d \
+ --name opa-server \
+ -p 8181:8181 \
+ openpolicyagent/opa:latest \
+ run --server --addr :8181 --log-level info
+```
+
+**2. Upload Policy to OPA Server**
+
+```bash
+# Create policy file policy.rego
+cat > policy.rego <<EOF
+package http.authz
+
+default allow = false
+
+allow {
+ input.method == "GET"
+ input.path == "/status"
+}
+EOF
+
+# Upload policy
+curl -X PUT http://localhost:8181/v1/policies/authz \
+ -H "Content-Type: text/plain" \
+ --data-binary @policy.rego
+```
+
+**3. Test Policy**
+
+```bash
+curl -X POST http://localhost:8181/v1/data/http/authz/allow \
+ -H "Content-Type: application/json" \
+ -d '{
+ "input": {
+ "method": "GET",
+ "path": "/status"
+ }
+ }'
+```
+
+#### Docker Compose Deployment (Recommended)
+
+```yaml
+version: '3.8'
+
+services:
+ opa-server:
+ image: openpolicyagent/opa:latest
+ container_name: opa-server
+ ports:
+ - "8181:8181"
+ command:
+ - "run"
+ - "--server"
+ - "--addr=:8181"
+ - "--log-level=info"
+ volumes:
+ - ./opa-policies:/policies:ro
+ healthcheck:
+ test: ["CMD", "wget", "--spider", "http://localhost:8181/health"]
+ interval: 10s
+ timeout: 3s
+ retries: 3
+ restart: unless-stopped
+
+ pixiu:
+ image: apache/dubbo-go-pixiu:latest
+ depends_on:
+ opa-server:
+ condition: service_healthy
+ volumes:
+ - ./configs:/configs:ro
+ ports:
+ - "8888:8888"
+```
+
+#### Policy Hot Updates
+
+OPA server supports updating policies at runtime without restart:
+
+```bash
+# Update policy
+curl -X PUT http://localhost:8181/v1/policies/authz \
+ -H "Content-Type: text/plain" \
+ --data-binary @new-policy.rego
+```
+
### Minimal examples
-**1) Allow only GET /status**
+**Server Mode Examples**
+
+**1) Allow only GET /status (using OPA server)**
+
+```yaml
+- name: dgp.filter.http.opa
+ config:
+ server_url: "http://opa-server:8181"
+ decision_path: "/v1/data/http/authz/allow"
+ timeout_ms: 100
+```
+
+Corresponding OPA policy (uploaded to server):
+
+```rego
+package http.authz
+
+default allow = false
+
+allow {
+ input.method == "GET"
+ input.path == "/status"
+}
+```
+
+**2) Allow requests with a specific header value (using OPA server)**
+
+```yaml
+- name: dgp.filter.http.opa
+ config:
+ server_url: "http://opa-server:8181"
+ decision_path: "/v1/data/http/authz/allow"
+ timeout_ms: 100
+ bearer_token: "your-secret-token"
+```
+
+Corresponding OPA policy:
+
+```rego
+package http.authz
+
+default allow = false
+
+allow {
+ input.headers["X-Api-Key"][0] == "secret"
+}
+```
+
+**Embedded Mode Examples**
+
+**1) Allow only GET /status (embedded mode)**
```yaml
- name: dgp.filter.http.opa
@@ -92,7 +317,7 @@ input.params # route params map
entrypoint: "data.http.authz.allow"
```
-**2) Allow requests with a specific header value**
+**2) Allow requests with a specific header value (embedded mode)**
```yaml
- name: dgp.filter.http.opa
@@ -108,12 +333,71 @@ input.params # route params map
### Limitations and notes
-- **Return type must be boolean**: Only `true` allows; objects (e.g., `{allow:
true}`) will not be interpreted specially.
+**Response Format Support**
+
+The OPA filter supports multiple response formats and automatically recognizes
them:
+
+**Format 1: Boolean Value (Simple Cases)**
+```rego
+package http.authz
+default allow = false
+allow {
+ input.method == "GET"
+}
+```
+
+**Format 2: Object with "allow" Field (OPA Common Pattern, Recommended)**
+```rego
+package http.authz
+
+default decision = {"allow": false}
+
+decision = {"allow": true, "reason": "admin user"} {
+ input.headers["X-Role"][0] == "admin"
+}
+```
+
+**Format 3: Object with "result" Field**
+```rego
+package http.authz
+
+decision = {"result": true, "metadata": {"user": "alice"}} {
+ input.headers["X-Api-Key"][0] == "secret"
+}
+```
+
+The filter extracts the decision using the following priority:
+1. Direct boolean value → use that value
+2. Object's `allow` field → use the boolean value of `allow`
+3. Object's `result` field → use the boolean value of `result`
+4. Unrecognized format → default to deny (returns 403)
+
+---
+
+**Server Mode:**
+- **Network Latency**: Introduces network latency due to external OPA server
calls (typically 5-50ms).
+- **OPA Server Availability**: Ensure OPA server high availability; recommend
deploying multiple instances.
+- **Timeout Configuration**: Default timeout of 100ms may be too tight in
cross-service/cloud environments. Consider adjusting to 300-500ms for
production based on network latency.
+
+**Embedded Mode:**
- **No custom deny status/body**: The filter does not map policy outputs to
HTTP status or body.
- **Module-only loading**: Policies are loaded from the inline `policy` string
only.
+- **Policy updates require restart**: Modifying policies requires restarting
Pixiu service.
### Troubleshooting
-- **Denied unexpectedly**: Confirm the query is correct (e.g.,
`data.http.authz.allow`), and that the policy returns **`true`** for the given
`input`.
-- **Policy compile errors**: Validate the Rego module with `opa eval` locally
before embedding.
-- **Nil/empty results**: Re-check access to `headers`/`query` (they are maps
of lists), and confirm path/method match.
\ No newline at end of file
+**Server Mode:**
+- **Connection Failed**: Check `server_url` configuration, ensure OPA server
is running and accessible.
+- **Timeout Errors**: Increase `timeout_ms` value, or check OPA server
performance.
+- **Authentication Failed**: If OPA server requires authentication, ensure
`bearer_token` is correctly configured.
+- **Decision Path Error**: Ensure `decision_path` matches the policy path in
OPA server.
+- **Denied Unexpectedly**: Use `curl` to directly test OPA server's decision
API to verify policy logic.
+
+**Embedded Mode:**
+- **Denied Unexpectedly**: Confirm the query is correct (e.g.,
`data.http.authz.allow`), and that the policy returns **`true`** for the given
`input`.
+- **Policy Compile Errors**: Validate the Rego module with `opa eval` locally
before embedding.
+- **Nil/Empty Results**: Re-check access to `headers`/`query` (they are maps
of lists), and confirm path/method match.
+
+**General Troubleshooting:**
+- **Check Logs**: Pixiu logs detailed OPA evaluation information, including
errors and decision results.
+- **Test Policy**: Test the policy first in OPA server or locally using `opa
eval` to ensure the logic is correct.
\ No newline at end of file
diff --git a/docs/user/filter/opa_CN.md b/docs/user/filter/opa_CN.md
index 7a679c98..2009e929 100644
--- a/docs/user/filter/opa_CN.md
+++ b/docs/user/filter/opa_CN.md
@@ -7,18 +7,58 @@
## 中文
### 概述
-`dgp.filter.http.opa` 过滤器通过 Rego 策略将授权决策委托给 Open Policy Agent (OPA)。该过滤器评估每个
HTTP 请求并根据 Rego 策略决定是否允许或拒绝请求。策略通过内联 Rego 模块加载,并使用 OPA 的内置查询引擎进行评估。
+`dgp.filter.http.opa` 过滤器通过 Rego 策略将授权决策委托给 Open Policy Agent (OPA)。该过滤器评估每个
HTTP 请求并根据 Rego 策略决定是否允许或拒绝请求。
-### 实际行为
+过滤器支持两种运行模式:
+- **服务器模式(推荐)**:通过 HTTP API 调用独立的 OPA 服务器进行策略评估,支持集中式策略管理和热更新。
+- **嵌入模式**:策略通过内联 Rego 模块加载,并使用 OPA 的内置查询引擎进行评估。
+
+### 运行模式
+
+#### 服务器模式(推荐)
+- 通过 REST API 调用外部 OPA 服务器 (`server_url`)。
+- 支持集中式策略管理和动态更新,无需重启 Pixiu。
+- 适合生产环境和大规模部署。
+- 配置简单,只需指定 OPA 服务器地址和决策路径。
+
+#### 嵌入模式(向后兼容)
- 从配置项 `policy` 读取 **Rego 模块源码字符串**。
- 从配置项 `entrypoint` 读取 **Rego 查询字符串**。
-- 每次请求构造 `input` 对象(见下),并评估该查询。
-- 如果查询结果为 `true`,则放行请求;否则拒绝请求。
+- 策略在 Pixiu 进程内评估,更新策略需要重启。
+- 适合简单场景或测试环境。
-> 目前过滤器**不支持**:外部文件或 URI 加载、自定义拒绝状态码或返回自定义错误体等。
+> **自动模式选择**:
+> - 如果配置了 `server_url`,则使用**服务器模式**(优先)。
+> - 如果未配置 `server_url` 但配置了 `policy`,则使用**嵌入模式**。
### 配置结构
-将过滤器添加到 HTTP 连接管理器的 `http_filters` 列表中:
+
+#### 服务器模式配置(推荐)
+
+```yaml
+filters:
+ - name: dgp.filter.httpconnectionmanager
+ config:
+ route_config:
+ # ... 你的路由
+ http_filters:
+ - name: dgp.filter.http.opa
+ config:
+ # OPA 服务器地址
+ server_url: "http://opa-server:8181"
+ # 决策路径(OPA REST API 路径)
+ decision_path: "/v1/data/http/authz/allow"
+ # 请求超时时间(毫秒),默认 100ms
+ timeout_ms: 100
+ # 可选:Bearer Token 认证
+ # bearer_token: "your-secret-token"
+ # HTTP proxy 过滤器应该在 OPA 过滤器之后
+ - name: dgp.filter.http.proxy
+ config:
+ # ... proxy config
+```
+
+#### 嵌入模式配置(向后兼容)
```yaml
filters:
@@ -39,21 +79,45 @@ filters:
input.path == "/status"
}
entrypoint: "data.http.authz.allow"
- # HTTP proxy 过滤器应该在OPA 过滤器之后
+ # HTTP proxy 过滤器应该在 OPA 过滤器之后
- name: dgp.filter.http.proxy
config:
- # ... proxy config
+ # ... proxy config
```
#### 字段说明
-- **`policy`**(字符串,必填)
+**服务器模式字段:**
+
+- **`server_url`**(字符串,服务器模式必填)
+ - **含义:** OPA 服务器地址,如 `http://opa-server:8181` 或
`https://opa.example.com:8181`。
+ - **数据类型:** `string`。
+ - **示例:** `"http://localhost:8181"`。
+
+- **`decision_path`**(字符串,服务器模式必填)
+ - **含义:** OPA REST API 决策路径,格式为 `/v1/data/<package>/<rule>`。
+ - **数据类型:** `string`。
+ - **示例:** `"/v1/data/http/authz/allow"`。
+
+- **`timeout_ms`**(整数,可选)
+ - **含义:** 请求 OPA 服务器的超时时间(毫秒)。
+ - **数据类型:** `int`。
+ - **默认值:** `100`(100 毫秒)。
+
+- **`bearer_token`**(字符串,可选)
+ - **含义:** 用于 OPA 服务器认证的 Bearer Token。
+ - **数据类型:** `string`。
+ - **说明:** 如果 OPA 服务器需要认证,可配置此字段。
+
+**嵌入模式字段(向后兼容):**
+
+- **`policy`**(字符串,嵌入模式必填)
- **含义:** **Rego 模块源码**(内联字符串)。通过 `rego.Module("policy.rego", policy)` 加载。
- **数据类型:** `string`(建议使用 YAML 多行格式 `|`)。
- - **说明:** 当前版本不支持外部文件路径或 bundle URI。
-- **`entrypoint`**(字符串,必填)
+
+- **`entrypoint`**(字符串,嵌入模式必填)
- **含义:** 传给 `rego.Query(...)` 的 **Rego 查询字符串**,应为合法查询,如
`data.<package>.<rule>`(如 `data.http.authz.allow`)。
- **数据类型:** `string`。
@@ -80,9 +144,170 @@ input.api # API 对象(结构可能变化)
input.params # 路由参数 map
```
+**重要提示:HTTP Header 规范化**
+
+Go 的 `net/http` 包会自动将 HTTP header 名称规范化为 **Title-Case** 格式(如
`X-Api-Key`、`Content-Type`)。在策略中访问 headers 时,请使用规范化后的键名:
+
+- ✅ 正确:`input.headers["X-Api-Key"]` 或 `input.headers["Content-Type"]`
+- ❌ 错误:`input.headers["x-api-key"]` 或 `input.headers["x_api_key"]`
+
+**示例:**
+```rego
+# 正确的 header 访问方式
+allow {
+ input.headers["X-Api-Key"][0] == "secret"
+ input.headers["Content-Type"][0] == "application/json"
+}
+```
+
+### OPA 服务器部署(服务器模式)
+
+#### Docker 部署
+
+**1. 启动 OPA 服务器**
+
+```bash
+docker run -d \
+ --name opa-server \
+ -p 8181:8181 \
+ openpolicyagent/opa:latest \
+ run --server --addr :8181 --log-level info
+```
+
+**2. 上传策略到 OPA 服务器**
+
+```bash
+# 创建策略文件 policy.rego
+cat > policy.rego <<EOF
+package http.authz
+
+default allow = false
+
+allow {
+ input.method == "GET"
+ input.path == "/status"
+}
+EOF
+
+# 上传策略
+curl -X PUT http://localhost:8181/v1/policies/authz \
+ -H "Content-Type: text/plain" \
+ --data-binary @policy.rego
+```
+
+**3. 测试策略**
+
+```bash
+curl -X POST http://localhost:8181/v1/data/http/authz/allow \
+ -H "Content-Type: application/json" \
+ -d '{
+ "input": {
+ "method": "GET",
+ "path": "/status"
+ }
+ }'
+```
+
+#### Docker Compose 部署(推荐)
+
+```yaml
+version: '3.8'
+
+services:
+ opa-server:
+ image: openpolicyagent/opa:latest
+ container_name: opa-server
+ ports:
+ - "8181:8181"
+ command:
+ - "run"
+ - "--server"
+ - "--addr=:8181"
+ - "--log-level=info"
+ volumes:
+ - ./opa-policies:/policies:ro
+ healthcheck:
+ test: ["CMD", "wget", "--spider", "http://localhost:8181/health"]
+ interval: 10s
+ timeout: 3s
+ retries: 3
+ restart: unless-stopped
+
+ pixiu:
+ image: apache/dubbo-go-pixiu:latest
+ depends_on:
+ opa-server:
+ condition: service_healthy
+ volumes:
+ - ./configs:/configs:ro
+ ports:
+ - "8888:8888"
+```
+
+#### 策略热更新
+
+OPA 服务器支持在运行时更新策略,无需重启:
+
+```bash
+# 更新策略
+curl -X PUT http://localhost:8181/v1/policies/authz \
+ -H "Content-Type: text/plain" \
+ --data-binary @new-policy.rego
+```
+
### 最小可用示例
-**1)仅允许 GET /status**
+**服务器模式示例**
+
+**1)仅允许 GET /status(使用 OPA 服务器)**
+
+```yaml
+- name: dgp.filter.http.opa
+ config:
+ server_url: "http://opa-server:8181"
+ decision_path: "/v1/data/http/authz/allow"
+ timeout_ms: 100
+```
+
+对应的 OPA 策略(上传到服务器):
+
+```rego
+package http.authz
+
+default allow = false
+
+allow {
+ input.method == "GET"
+ input.path == "/status"
+}
+```
+
+**2)基于请求头校验(使用 OPA 服务器)**
+
+```yaml
+- name: dgp.filter.http.opa
+ config:
+ server_url: "http://opa-server:8181"
+ decision_path: "/v1/data/http/authz/allow"
+ timeout_ms: 100
+ bearer_token: "your-secret-token"
+```
+
+对应的 OPA 策略:
+
+```rego
+package http.authz
+
+default allow = false
+
+allow {
+ input.headers["X-Api-Key"][0] == "secret"
+}
+```
+
+**嵌入模式示例**
+
+**1)仅允许 GET /status(嵌入模式)**
```yaml
- name: dgp.filter.http.opa
@@ -94,7 +319,7 @@ input.params # 路由参数 map
entrypoint: "data.http.authz.allow"
```
-**2)基于请求头校验**
+**2)基于请求头校验(嵌入模式)**
```yaml
- name: dgp.filter.http.opa
@@ -110,12 +335,71 @@ input.params # 路由参数 map
### 限制与说明
-- **返回类型必须是布尔值**:只有 `true` 会被视为放行;对象(如 `{allow: true}`)不会被特殊处理。
+**返回格式支持**
+
+OPA 策略支持多种返回格式,过滤器会自动识别:
+
+**格式 1:布尔值(简单场景)**
+```rego
+package http.authz
+default allow = false
+allow {
+ input.method == "GET"
+}
+```
+
+**格式 2:对象格式 - allow 字段(OPA 常见习惯,推荐)**
+```rego
+package http.authz
+
+default decision = {"allow": false}
+
+decision = {"allow": true, "reason": "admin user"} {
+ input.headers["X-Role"][0] == "admin"
+}
+```
+
+**格式 3:对象格式 - result 字段**
+```rego
+package http.authz
+
+decision = {"result": true, "metadata": {"user": "alice"}} {
+ input.headers["X-Api-Key"][0] == "secret"
+}
+```
+
+过滤器会按以下优先级提取决策结果:
+1. 直接布尔值 → 使用该值
+2. 对象的 `allow` 字段 → 使用 `allow` 的布尔值
+3. 对象的 `result` 字段 → 使用 `result` 的布尔值
+4. 无法识别 → 默认拒绝(返回 403)
+
+---
+
+**服务器模式:**
+- **网络延迟**:由于需要调用外部 OPA 服务器,会引入网络延迟(通常 5-50ms)。
+- **OPA 服务器可用性**:需要确保 OPA 服务器高可用,建议部署多实例。
+- **超时配置**:默认超时 100ms 可能在跨服务/云环境下偏紧,生产环境建议根据网络延迟调整(如 300-500ms)。
+
+**嵌入模式:**
- **没有自定义拒绝响应**:过滤器不会将策略输出映射到 HTTP 状态码或响应体。
- **仅支持内联模块加载**:策略来自配置字符串,不读取外部文件。
+- **策略更新需重启**:修改策略后需要重启 Pixiu 服务。
### 故障排查
+**服务器模式:**
+- **连接失败**:检查 `server_url` 配置是否正确,确保 OPA 服务器正在运行并可访问。
+- **超时错误**:增加 `timeout_ms` 值,或检查 OPA 服务器性能。
+- **认证失败**:如果 OPA 服务器需要认证,确保 `bearer_token` 配置正确。
+- **决策路径错误**:确保 `decision_path` 与 OPA 服务器中的策略路径匹配。
+- **意外拒绝**:使用 `curl` 直接测试 OPA 服务器的决策 API,验证策略逻辑。
+
+**嵌入模式:**
- **意外拒绝**:检查查询是否正确(如 `data.http.authz.allow`),并确保策略在给定的 `input` 下返回
**`true`**。
- **策略编译错误**:在嵌入策略之前,先使用 `opa eval` 本地验证 Rego 语法。
-- **空结果或类型不符**:请检查 `headers`/`query`,确保路径和方法匹配。
\ No newline at end of file
+- **空结果或类型不符**:请检查 `headers`/`query`,确保路径和方法匹配。
+
+**通用排查:**
+- **查看日志**:Pixiu 会记录 OPA 评估的详细日志,包括错误信息和决策结果。
+- **测试策略**:先在 OPA 服务器或本地使用 `opa eval` 测试策略,确保逻辑正确。
\ No newline at end of file
diff --git a/pkg/filter/opa/opa.go b/pkg/filter/opa/opa.go
index 1d6fca3f..100d611c 100644
--- a/pkg/filter/opa/opa.go
+++ b/pkg/filter/opa/opa.go
@@ -18,7 +18,13 @@
package opa
import (
+ "bytes"
+ "context"
+ "encoding/json"
"fmt"
+ "io"
+ "net/http"
+ "time"
)
import (
@@ -28,7 +34,7 @@ import (
import (
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
- "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+ contextHttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
)
@@ -44,18 +50,28 @@ type (
Plugin struct{}
FilterFactory struct {
- cfg *Config
- rego *rego.Rego
+ cfg *Config
+ rego *rego.Rego // For embedded mode
(deprecated, kept for backward compat)
+ preparedQuery *rego.PreparedEvalQuery // Pre-compiled query for
embedded mode
+ httpClient *http.Client // For server mode
}
Filter struct {
cfg *Config
- preparedQuery *rego.PreparedEvalQuery
+ preparedQuery *rego.PreparedEvalQuery // For embedded mode
+ httpClient *http.Client // For server mode
}
Config struct {
- Policy string `yaml:"policy" json:"policy" `
- Entrypoint string `yaml:"entrypoint" json:"entrypoint" `
+ // Server mode configuration (recommended for production)
+ ServerURL string `yaml:"server_url" json:"server_url"
mapstructure:"server_url"` // OPA Server address, e.g.,
http://opa-server:8181
+ DecisionPath string `yaml:"decision_path" json:"decision_path"
mapstructure:"decision_path"` // Decision path, e.g., /v1/data/http/authz/allow
+ TimeoutMs int `yaml:"timeout_ms" json:"timeout_ms"
mapstructure:"timeout_ms"` // Request timeout in milliseconds, default
100
+ BearerToken string `yaml:"bearer_token" json:"bearer_token"
mapstructure:"bearer_token"` // Optional authentication token
+
+ // Embedded mode configuration (for backward compatibility)
+ Policy string `yaml:"policy" json:"policy"
mapstructure:"policy"` // Policy content
+ Entrypoint string `yaml:"entrypoint" json:"entrypoint"
mapstructure:"entrypoint"` // Policy entrypoint
}
)
@@ -73,46 +89,89 @@ func (factory *FilterFactory) Config() any {
// Apply is called after the configuration is loaded and is used to prepare
the OPA query.
func (factory *FilterFactory) Apply() error {
- policy := factory.cfg.Policy
- if policy == "" {
- return fmt.Errorf("OPA policy is empty in the configuration")
+ cfg := factory.cfg
+
+ // Server mode (recommended for production)
+ if cfg.ServerURL != "" {
+ if cfg.DecisionPath == "" {
+ return fmt.Errorf("decision_path is required when using
OPA server mode")
+ }
+
+ timeout := 100 * time.Millisecond
+ if cfg.TimeoutMs > 0 {
+ timeout = time.Duration(cfg.TimeoutMs) *
time.Millisecond
+ }
+
+ factory.httpClient = &http.Client{
+ Timeout: timeout,
+ Transport: &http.Transport{
+ MaxIdleConns: 100,
+ MaxIdleConnsPerHost: 10,
+ IdleConnTimeout: 90 * time.Second,
+ },
+ }
+
+ logger.Infof("OPA filter initialized in server mode: %s%s
(timeout: %v)", cfg.ServerURL, cfg.DecisionPath, timeout)
+ return nil
}
- r := rego.New(
- rego.Query(factory.cfg.Entrypoint),
- rego.Module("policy.rego", policy),
- )
+ // Embedded mode (for backward compatibility)
+ if cfg.Policy != "" {
+ logger.Warnf("OPA filter using embedded mode. Consider using
server mode for better maintainability and performance.")
- factory.rego = r
+ if cfg.Entrypoint == "" {
+ return fmt.Errorf("entrypoint is required when using
embedded mode")
+ }
- return nil
+ r := rego.New(
+ rego.Query(cfg.Entrypoint),
+ rego.Module("policy.rego", cfg.Policy),
+ )
+
+ // Pre-compile the query once at initialization for better
performance
+ preparedQuery, err := r.PrepareForEval(context.Background())
+ if err != nil {
+ return fmt.Errorf("failed to prepare OPA query: %w",
err)
+ }
+
+ factory.rego = r
+ factory.preparedQuery = &preparedQuery
+
+ logger.Infof("OPA filter initialized in embedded mode")
+ return nil
+ }
+
+ return fmt.Errorf("OPA filter requires either server_url (recommended)
or policy configuration")
}
// PrepareFilterChain prepares the filter chain for a new request by
dynamically creating a Filter
-func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain
filter.FilterChain) error {
- if factory.rego == nil {
- return fmt.Errorf("rego instance not initialized in factory")
- }
+func (factory *FilterFactory) PrepareFilterChain(ctx *contextHttp.HttpContext,
chain filter.FilterChain) error {
+ // Shallow copy cfg (copy the struct value; inner reference fields
remain shared)
+ cfgCopy := *factory.cfg
+ var f *Filter
- preparedQuery, err := factory.rego.PrepareForEval(ctx.Ctx)
- if err != nil {
- return fmt.Errorf("failed to prepare OPA query: %w", err)
+ // Server mode (priority)
+ if factory.httpClient != nil {
+ f = &Filter{
+ cfg: &cfgCopy,
+ httpClient: factory.httpClient,
+ }
+ } else if factory.preparedQuery != nil {
+ // Embedded mode (backward compatibility) - reuse pre-compiled
query
+ f = &Filter{
+ cfg: &cfgCopy,
+ preparedQuery: factory.preparedQuery,
+ }
+ } else {
+ return fmt.Errorf("OPA filter not properly initialized")
}
- // Make a shallow copy of the factory config to avoid sharing the
factory's pointer.
- cfgCopy := *factory.cfg
- f := &Filter{cfg: &cfgCopy, preparedQuery: &preparedQuery}
chain.AppendDecodeFilters(f)
return nil
}
// Decode is the core logic of the filter. It converts HTTP request data into
a standard OPA input format and evaluates the policy.
-func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {
- if f.preparedQuery == nil {
- logger.Error("OPA filter not initialized properly.")
- return filter.Stop
- }
-
+func (f *Filter) Decode(c *contextHttp.HttpContext) filter.FilterStatus {
input := map[string]any{
"method": c.Request.Method,
"path": c.Request.URL.Path,
@@ -127,15 +186,165 @@ func (f *Filter) Decode(c *http.HttpContext)
filter.FilterStatus {
"params": c.Params,
}
+ // Server mode (priority)
+ if f.httpClient != nil {
+ return f.evaluateServer(c, input)
+ }
+
+ // Embedded mode (backward compatibility)
+ if f.preparedQuery != nil {
+ return f.evaluateEmbedded(c, input)
+ }
+
+ logger.Error("OPA filter not initialized properly")
+ errResp := contextHttp.InternalError.WithError(fmt.Errorf("OPA filter
not initialized"))
+ c.SendLocalReply(errResp.Status, errResp.ToJSON())
+ return filter.Stop
+}
+
+// evaluateEmbedded evaluates the policy using embedded OPA engine
+func (f *Filter) evaluateEmbedded(c *contextHttp.HttpContext, input
map[string]any) filter.FilterStatus {
results, err := f.preparedQuery.Eval(c.Ctx, rego.EvalInput(input))
if err != nil {
- logger.Error("OPA evaluation error: %v\n", err)
+ logger.Errorf("OPA embedded evaluation error: %v", err)
+ errResp := contextHttp.InternalError.WithError(err)
+ c.SendLocalReply(errResp.Status, errResp.ToJSON())
return filter.Stop
}
- if len(results) == 0 || results[0].Expressions[0].Value != true {
+ if len(results) == 0 {
+ logger.Debugf("OPA embedded policy returned empty result for
request: %s %s", input["method"], input["path"])
+ errResp := contextHttp.Forbidden.New()
+ c.SendLocalReply(errResp.Status, errResp.ToJSON())
+ return filter.Stop
+ }
+
+ // Extract decision from result (supports both boolean and object
formats)
+ allow := extractDecision(results[0].Expressions[0].Value)
+ if !allow {
+ logger.Debugf("OPA embedded policy denied request: %s %s",
input["method"], input["path"])
+ errResp := contextHttp.Forbidden.New()
+ c.SendLocalReply(errResp.Status, errResp.ToJSON())
return filter.Stop
}
return filter.Continue
}
+
+// evaluateServer evaluates the policy by calling OPA server REST API
+func (f *Filter) evaluateServer(c *contextHttp.HttpContext, input
map[string]any) filter.FilterStatus {
+ // Construct request body according to OPA REST API specification
+ requestBody := map[string]any{
+ "input": input,
+ }
+
+ jsonData, err := json.Marshal(requestBody)
+ if err != nil {
+ logger.Errorf("Failed to marshal OPA request: %v", err)
+ errResp := contextHttp.InternalError.WithError(err)
+ c.SendLocalReply(errResp.Status, errResp.ToJSON())
+ return filter.Stop
+ }
+
+ // Construct HTTP request
+ url := f.cfg.ServerURL + f.cfg.DecisionPath
+ req, err := http.NewRequestWithContext(c.Ctx, "POST", url,
bytes.NewBuffer(jsonData))
+ if err != nil {
+ logger.Errorf("Failed to create OPA request: %v", err)
+ errResp := contextHttp.InternalError.WithError(err)
+ c.SendLocalReply(errResp.Status, errResp.ToJSON())
+ return filter.Stop
+ }
+
+ req.Header.Set("Content-Type", "application/json")
+ if f.cfg.BearerToken != "" {
+ req.Header.Set("Authorization", "Bearer "+f.cfg.BearerToken)
+ }
+
+ // Send request to OPA server
+ resp, err := f.httpClient.Do(req)
+ if err != nil {
+ logger.Errorf("OPA server request failed: %v", err)
+ // Check if it's a timeout error using net.Error interface
+ if netErr, ok := err.(interface{ Timeout() bool }); ok &&
netErr.Timeout() {
+ errResp := contextHttp.GatewayTimeout.WithError(err)
+ c.SendLocalReply(errResp.Status, errResp.ToJSON())
+ } else {
+ errResp := contextHttp.ServiceUnavailable.WithError(err)
+ c.SendLocalReply(errResp.Status, errResp.ToJSON())
+ }
+ return filter.Stop
+ }
+ defer resp.Body.Close()
+
+ // Check HTTP status code
+ if resp.StatusCode != 200 {
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ logger.Errorf("Failed to read OPA server response body:
%v", err)
+ body = []byte("")
+ }
+ logger.Errorf("OPA server returned status %d: %s",
resp.StatusCode, string(body))
+ errResp := contextHttp.BadGateway.WithError(fmt.Errorf("OPA
server returned status %d", resp.StatusCode))
+ c.SendLocalReply(errResp.Status, errResp.ToJSON())
+ return filter.Stop
+ }
+
+ // Parse response according to OPA REST API specification
+ var result map[string]any
+ if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
+ logger.Errorf("Failed to decode OPA response: %v", err)
+ errResp := contextHttp.BadGateway.WithError(err)
+ c.SendLocalReply(errResp.Status, errResp.ToJSON())
+ return filter.Stop
+ }
+
+ // Extract decision from result (supports both boolean and object
formats)
+ if resultValue, exists := result["result"]; exists {
+ allow := extractDecision(resultValue)
+ if !allow {
+ logger.Debugf("OPA server policy denied request: %s
%s", input["method"], input["path"])
+ errResp := contextHttp.Forbidden.New()
+ c.SendLocalReply(errResp.Status, errResp.ToJSON())
+ return filter.Stop
+ }
+ return filter.Continue
+ }
+
+ logger.Errorf("Invalid OPA response format: missing 'result' field")
+ errResp := contextHttp.BadGateway.WithError(fmt.Errorf("missing
'result' field in OPA response"))
+ c.SendLocalReply(errResp.Status, errResp.ToJSON())
+ return filter.Stop
+}
+
+// extractDecision extracts the allow decision from OPA result
+// Supports multiple formats:
+// 1. Boolean: true/false
+// 2. Object with "allow" field: {allow: true}
+// 3. Object with "result" field: {result: true}
+func extractDecision(value any) bool {
+ // Format 1: Direct boolean value
+ if boolVal, ok := value.(bool); ok {
+ return boolVal
+ }
+
+ // Format 2 & 3: Object with allow/result field
+ if objVal, ok := value.(map[string]any); ok {
+ // Try "allow" field first (common OPA pattern)
+ if allowVal, exists := objVal["allow"]; exists {
+ if boolVal, ok := allowVal.(bool); ok {
+ return boolVal
+ }
+ }
+ // Try "result" field as fallback
+ if resultVal, exists := objVal["result"]; exists {
+ if boolVal, ok := resultVal.(bool); ok {
+ return boolVal
+ }
+ }
+ }
+
+ // Default to deny if format is unrecognized
+ logger.Warnf("Unrecognized OPA decision format, defaulting to deny.
Value: %v", value)
+ return false
+}
diff --git a/pkg/filter/opa/opa_test.go b/pkg/filter/opa/opa_test.go
index c26e802b..98a50824 100644
--- a/pkg/filter/opa/opa_test.go
+++ b/pkg/filter/opa/opa_test.go
@@ -19,8 +19,11 @@ package opa
import (
"context"
+ "encoding/json"
+ "net/http"
"net/http/httptest"
"testing"
+ "time"
)
import (
@@ -31,7 +34,7 @@ import (
import (
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
- "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+ contextHttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
)
const testPolicy = `
@@ -65,13 +68,14 @@ func setupFilterWithoutFile(t *testing.T, policy string)
*Filter {
}
}
-func TestAllowedRule(t *testing.T) {
+// TestEmbeddedAllowedRule tests embedded mode with allowed request
+func TestEmbeddedAllowedRule(t *testing.T) {
f := setupFilterWithoutFile(t, testPolicy)
req := httptest.NewRequest("GET", "/test", nil)
req.Header.Set("Test_Header", "1")
rec := httptest.NewRecorder()
- ctx := &http.HttpContext{
+ ctx := &contextHttp.HttpContext{
Writer: rec,
Request: req,
Ctx: context.Background(),
@@ -81,13 +85,14 @@ func TestAllowedRule(t *testing.T) {
assert.Equal(t, filter.Continue, result)
}
-func TestDeniedRule(t *testing.T) {
+// TestEmbeddedDeniedRule tests embedded mode with denied request
+func TestEmbeddedDeniedRule(t *testing.T) {
f := setupFilterWithoutFile(t, testPolicy)
req := httptest.NewRequest("GET", "/test", nil)
req.Header.Set("Test_Header", "0")
rec := httptest.NewRecorder()
- ctx := &http.HttpContext{
+ ctx := &contextHttp.HttpContext{
Writer: rec,
Request: req,
Ctx: context.Background(),
@@ -96,3 +101,448 @@ func TestDeniedRule(t *testing.T) {
result := f.Decode(ctx)
assert.Equal(t, filter.Stop, result)
}
+
+// TestServerModeAllowed tests server mode with allowed request
+func TestServerModeAllowed(t *testing.T) {
+ // Create mock OPA server
+ server := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ assert.Equal(t, "POST", r.Method)
+ assert.Equal(t, "application/json",
r.Header.Get("Content-Type"))
+ assert.Equal(t, "/v1/data/test/allow", r.URL.Path)
+
+ // Read and verify request body
+ var reqBody map[string]any
+ err := json.NewDecoder(r.Body).Decode(&reqBody)
+ assert.Nil(t, err)
+ assert.NotNil(t, reqBody["input"])
+
+ input := reqBody["input"].(map[string]any)
+
+ // Simulate policy: Test_Header == "1" allows
+ // After JSON unmarshaling, headers is map[string][]string
+ // Note: HTTP header keys are canonicalized (e.g.,
"Test_Header" -> "Test_header")
+ allow := false
+ if headersMap, ok := input["headers"].(map[string]any); ok {
+ // Check for "Test_header" (canonicalized form)
+ if testHeader, ok := headersMap["Test_header"]; ok {
+ if headerArray, ok := testHeader.([]any); ok &&
len(headerArray) > 0 {
+ if strVal, ok :=
headerArray[0].(string); ok {
+ allow = strVal == "1"
+ }
+ }
+ }
+ }
+
+ // Return OPA response format
+ response := map[string]any{
+ "result": allow,
+ }
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(response)
+ }))
+ defer server.Close()
+
+ // Create FilterFactory with server mode configuration
+ factory := &FilterFactory{
+ cfg: &Config{
+ ServerURL: server.URL,
+ DecisionPath: "/v1/data/test/allow",
+ TimeoutMs: 100,
+ },
+ }
+
+ err := factory.Apply()
+ assert.Nil(t, err)
+ assert.NotNil(t, factory.httpClient)
+
+ // Prepare filter chain
+ req := httptest.NewRequest("GET", "/test", nil)
+ req.Header.Set("Test_Header", "1")
+ rec := httptest.NewRecorder()
+ ctx := &contextHttp.HttpContext{
+ Writer: rec,
+ Request: req,
+ Ctx: context.Background(),
+ }
+
+ chain := &mockFilterChain{}
+ err = factory.PrepareFilterChain(ctx, chain)
+ assert.Nil(t, err)
+ assert.Len(t, chain.filters, 1)
+
+ // Execute filter
+ result := chain.filters[0].Decode(ctx)
+ assert.Equal(t, filter.Continue, result)
+}
+
+// TestServerModeDenied tests server mode with denied request
+func TestServerModeDenied(t *testing.T) {
+ // Create mock OPA server
+ server := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ var reqBody map[string]any
+ json.NewDecoder(r.Body).Decode(&reqBody)
+ input := reqBody["input"].(map[string]any)
+
+ // After JSON unmarshaling, headers is map[string][]string
+ // Note: HTTP header keys are canonicalized
+ allow := false
+ if headersMap, ok := input["headers"].(map[string]any); ok {
+ if testHeader, ok := headersMap["Test_header"]; ok {
+ if headerArray, ok := testHeader.([]any); ok &&
len(headerArray) > 0 {
+ if strVal, ok :=
headerArray[0].(string); ok {
+ allow = strVal == "1"
+ }
+ }
+ }
+ }
+
+ response := map[string]any{
+ "result": allow,
+ }
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(response)
+ }))
+ defer server.Close()
+
+ factory := &FilterFactory{
+ cfg: &Config{
+ ServerURL: server.URL,
+ DecisionPath: "/v1/data/test/allow",
+ TimeoutMs: 100,
+ },
+ }
+
+ err := factory.Apply()
+ assert.Nil(t, err)
+
+ req := httptest.NewRequest("GET", "/test", nil)
+ req.Header.Set("Test_Header", "0") // This will be denied
+ rec := httptest.NewRecorder()
+ ctx := &contextHttp.HttpContext{
+ Writer: rec,
+ Request: req,
+ Ctx: context.Background(),
+ }
+
+ chain := &mockFilterChain{}
+ err = factory.PrepareFilterChain(ctx, chain)
+ assert.Nil(t, err)
+
+ result := chain.filters[0].Decode(ctx)
+ assert.Equal(t, filter.Stop, result)
+}
+
+// TestServerModeWithBearerToken tests server mode with authentication token
+func TestServerModeWithBearerToken(t *testing.T) {
+ expectedToken := "test-token-123"
+
+ server := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ // Verify authentication header
+ authHeader := r.Header.Get("Authorization")
+ assert.Equal(t, "Bearer "+expectedToken, authHeader)
+
+ response := map[string]any{
+ "result": true,
+ }
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(response)
+ }))
+ defer server.Close()
+
+ factory := &FilterFactory{
+ cfg: &Config{
+ ServerURL: server.URL,
+ DecisionPath: "/v1/data/test/allow",
+ TimeoutMs: 100,
+ BearerToken: expectedToken,
+ },
+ }
+
+ err := factory.Apply()
+ assert.Nil(t, err)
+
+ req := httptest.NewRequest("GET", "/test", nil)
+ rec := httptest.NewRecorder()
+ ctx := &contextHttp.HttpContext{
+ Writer: rec,
+ Request: req,
+ Ctx: context.Background(),
+ }
+
+ chain := &mockFilterChain{}
+ factory.PrepareFilterChain(ctx, chain)
+ result := chain.filters[0].Decode(ctx)
+ assert.Equal(t, filter.Continue, result)
+}
+
+// TestServerModeError tests error handling in server mode
+func TestServerModeError(t *testing.T) {
+ // Create a server that returns error
+ server := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusInternalServerError)
+ w.Write([]byte("Internal server error"))
+ }))
+ defer server.Close()
+
+ factory := &FilterFactory{
+ cfg: &Config{
+ ServerURL: server.URL,
+ DecisionPath: "/v1/data/test/allow",
+ TimeoutMs: 100,
+ },
+ }
+
+ err := factory.Apply()
+ assert.Nil(t, err)
+
+ req := httptest.NewRequest("GET", "/test", nil)
+ rec := httptest.NewRecorder()
+ ctx := &contextHttp.HttpContext{
+ Writer: rec,
+ Request: req,
+ Ctx: context.Background(),
+ }
+
+ chain := &mockFilterChain{}
+ factory.PrepareFilterChain(ctx, chain)
+ result := chain.filters[0].Decode(ctx)
+ assert.Equal(t, filter.Stop, result) // Should deny on error
+}
+
+// TestServerModeTimeout tests timeout handling in server mode
+func TestServerModeTimeout(t *testing.T) {
+ // Create a server that delays response
+ server := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ // Sleep longer than the timeout
+ time.Sleep(200 * time.Millisecond)
+ response := map[string]any{
+ "result": true,
+ }
+ json.NewEncoder(w).Encode(response)
+ }))
+ defer server.Close()
+
+ factory := &FilterFactory{
+ cfg: &Config{
+ ServerURL: server.URL,
+ DecisionPath: "/v1/data/test/allow",
+ TimeoutMs: 50, // Very short timeout to trigger
timeout
+ },
+ }
+
+ err := factory.Apply()
+ assert.Nil(t, err)
+
+ req := httptest.NewRequest("GET", "/test", nil)
+ rec := httptest.NewRecorder()
+ ctx := &contextHttp.HttpContext{
+ Writer: rec,
+ Request: req,
+ Ctx: context.Background(),
+ }
+
+ chain := &mockFilterChain{}
+ factory.PrepareFilterChain(ctx, chain)
+ result := chain.filters[0].Decode(ctx)
+
+ // Should return Stop on timeout
+ assert.Equal(t, filter.Stop, result)
+
+ // Check that the response contains timeout error (504)
+ assert.Equal(t, 504, ctx.GetStatusCode())
+}
+
+// TestServerModeObjectResponse tests server mode with object response format
+func TestServerModeObjectResponse(t *testing.T) {
+ // Test object response with "allow" field
+ server := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ var reqBody map[string]any
+ json.NewDecoder(r.Body).Decode(&reqBody)
+ input := reqBody["input"].(map[string]any)
+
+ headersMap := input["headers"].(map[string]any)
+ testHeaderValue := ""
+ if testHeader, ok := headersMap["Test_header"]; ok {
+ if headerArray, ok := testHeader.([]any); ok &&
len(headerArray) > 0 {
+ if strVal, ok := headerArray[0].(string); ok {
+ testHeaderValue = strVal
+ }
+ }
+ }
+
+ // Return object format: {allow: true}
+ response := map[string]any{
+ "result": map[string]any{
+ "allow": testHeaderValue == "1",
+ "reason": "test policy",
+ },
+ }
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(response)
+ }))
+ defer server.Close()
+
+ factory := &FilterFactory{
+ cfg: &Config{
+ ServerURL: server.URL,
+ DecisionPath: "/v1/data/test/allow",
+ TimeoutMs: 100,
+ },
+ }
+
+ err := factory.Apply()
+ assert.Nil(t, err)
+
+ // Test allow case
+ req := httptest.NewRequest("GET", "/test", nil)
+ req.Header.Set("Test_Header", "1")
+ rec := httptest.NewRecorder()
+ ctx := &contextHttp.HttpContext{
+ Writer: rec,
+ Request: req,
+ Ctx: context.Background(),
+ }
+
+ chain := &mockFilterChain{}
+ factory.PrepareFilterChain(ctx, chain)
+ result := chain.filters[0].Decode(ctx)
+ assert.Equal(t, filter.Continue, result)
+
+ // Test deny case
+ req2 := httptest.NewRequest("GET", "/test", nil)
+ req2.Header.Set("Test_Header", "0")
+ rec2 := httptest.NewRecorder()
+ ctx2 := &contextHttp.HttpContext{
+ Writer: rec2,
+ Request: req2,
+ Ctx: context.Background(),
+ }
+
+ chain2 := &mockFilterChain{}
+ factory.PrepareFilterChain(ctx2, chain2)
+ result2 := chain2.filters[0].Decode(ctx2)
+ assert.Equal(t, filter.Stop, result2)
+}
+
+// TestEmbeddedObjectResponse tests embedded mode with object response format
+func TestEmbeddedObjectResponse(t *testing.T) {
+ // Policy that returns object with allow field
+ objectPolicy := `
+package test
+import future.keywords.if
+
+allow if {
+ input.headers[Test_Header][0] == "1"
+}
+
+decision := {
+ "allow": allow,
+ "reason": "test policy"
+}
+`
+
+ r := rego.New(
+ rego.Query("data.test.decision"),
+ rego.Module("policy.rego", objectPolicy),
+ )
+
+ preparedQuery, err := r.PrepareForEval(context.Background())
+ assert.Nil(t, err)
+
+ f := &Filter{
+ cfg: &Config{
+ Policy: objectPolicy,
+ Entrypoint: "data.test.decision",
+ },
+ preparedQuery: &preparedQuery,
+ }
+
+ // Test allow case
+ req := httptest.NewRequest("GET", "/test", nil)
+ req.Header.Set("Test_Header", "1")
+ rec := httptest.NewRecorder()
+ ctx := &contextHttp.HttpContext{
+ Writer: rec,
+ Request: req,
+ Ctx: context.Background(),
+ }
+
+ result := f.Decode(ctx)
+ assert.Equal(t, filter.Continue, result)
+
+ // Test deny case
+ req2 := httptest.NewRequest("GET", "/test", nil)
+ req2.Header.Set("Test_Header", "0")
+ rec2 := httptest.NewRecorder()
+ ctx2 := &contextHttp.HttpContext{
+ Writer: rec2,
+ Request: req2,
+ Ctx: context.Background(),
+ }
+
+ result2 := f.Decode(ctx2)
+ assert.Equal(t, filter.Stop, result2)
+}
+
+// TestConfigValidation tests configuration validation
+func TestConfigValidation(t *testing.T) {
+ // Test missing decision_path in server mode
+ factory := &FilterFactory{
+ cfg: &Config{
+ ServerURL: "http://localhost:8181",
+ },
+ }
+ err := factory.Apply()
+ assert.NotNil(t, err)
+ assert.Contains(t, err.Error(), "decision_path is required")
+
+ // Test missing policy in embedded mode
+ factory2 := &FilterFactory{
+ cfg: &Config{
+ Entrypoint: "data.test.allow",
+ },
+ }
+ err2 := factory2.Apply()
+ assert.NotNil(t, err2)
+ assert.Contains(t, err2.Error(), "server_url")
+
+ // Test missing entrypoint in embedded mode
+ factory3 := &FilterFactory{
+ cfg: &Config{
+ Policy: "package test",
+ },
+ }
+ err3 := factory3.Apply()
+ assert.NotNil(t, err3)
+ assert.Contains(t, err3.Error(), "entrypoint is required")
+}
+
+// mockFilterChain is a mock implementation of filter.FilterChain for testing
+type mockFilterChain struct {
+ filters []filter.HttpDecodeFilter
+}
+
+func (m *mockFilterChain) AppendDecodeFilters(f ...filter.HttpDecodeFilter) {
+ m.filters = append(m.filters, f...)
+}
+
+func (m *mockFilterChain) AppendEncodeFilters(f ...filter.HttpEncodeFilter) {
+ // Not needed for testing
+}
+
+func (m *mockFilterChain) OnDecode(ctx *contextHttp.HttpContext) {
+ // Not needed for testing
+}
+
+func (m *mockFilterChain) OnEncode(ctx *contextHttp.HttpContext) {
+ // Not needed for testing
+}
+
+// Backward compatibility test names
+func TestAllowedRule(t *testing.T) {
+ TestEmbeddedAllowedRule(t)
+}
+
+func TestDeniedRule(t *testing.T) {
+ TestEmbeddedDeniedRule(t)
+}