Copilot commented on code in PR #896:
URL: https://github.com/apache/ranger/pull/896#discussion_r3003994157


##########
pdp/src/test/java/org/apache/ranger/pdp/security/RangerPdpAuthFilterTest.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.pdp.security;
+
+import org.junit.jupiter.api.Test;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RangerPdpAuthFilterTest {
+    @Test
+    public void testInit_skipsHeaderHandlerWhenDisabled() {
+        RangerPdpAuthFilter filter = new RangerPdpAuthFilter();
+        Map<String, String> params = new HashMap<>();
+
+        params.put(RangerPdpAuthFilter.PARAM_AUTH_TYPES, "header");
+        params.put(RangerPdpAuthFilter.PARAM_HEADER_AUTHN_ENABLED, "false");
+
+        assertThrows(ServletException.class, () -> filter.init(new 
TestFilterConfig(params)));
+    }

Review Comment:
   Test name `testInit_skipsHeaderHandlerWhenDisabled` is a bit misleading: the 
assertion expects `filter.init(...)` to throw, which happens because skipping 
the only configured handler leaves zero handlers and causes init to fail. 
Consider renaming the test to reflect the expected behavior (e.g., init fails 
when only `header` is configured but `header.enabled=false`).



##########
dev-support/ranger-docker/README.md:
##########
@@ -111,7 +111,7 @@ Similarly, check the `depends` section of the 
`docker-compose.ranger-service.yam
 #### Bring up all containers
 ~~~
 ./scripts/ozone/ozone-plugin-docker-setup.sh
-docker compose -f docker-compose.ranger.yml -f 
docker-compose.ranger-usersync.yml -f docker-compose.ranger-tagsync.yml -f 
docker-compose.ranger-kms.yml -f docker-compose.ranger-hadoop.yml -f 
docker-compose.ranger-hbase.yml -f docker-compose.ranger-kafka.yml -f 
docker-compose.ranger-hive.yml -f docker-compose.ranger-knox.yml -f 
docker-compose.ranger-ozone.yml up -d
+docker compose -f docker-compose.ranger.yml -f 
docker-compose.ranger-usersync.yml -f docker-compose.ranger-tagsync.yml -f 
docker-compose.ranger-pdp.xml -f docker-compose.ranger-kms.yml -f 
docker-compose.ranger-hadoop.yml -f docker-compose.ranger-hbase.yml -f 
docker-compose.ranger-kafka.yml -f docker-compose.ranger-hive.yml -f 
docker-compose.ranger-knox.yml -f docker-compose.ranger-ozone.yml up -d
 ~~~

Review Comment:
   This second compose command also references `docker-compose.ranger-pdp.xml`, 
but the compose file committed in this PR is `docker-compose.ranger-pdp.yml`. 
Please update the README so copy/paste works.



##########
dev-support/ranger-docker/README.md:
##########
@@ -74,14 +74,14 @@ cd dev-support/ranger-docker
 
 ### Run Ranger Services in Containers
 
-#### Bring up ranger-core services: ranger, usersync, tagsync and ranger-kms 
in containers
+#### Bring up ranger-core services: ranger, usersync, tagsync, pdp and 
ranger-kms in containers
 ~~~
 # To enable file based sync source for usersync do:
 # export ENABLE_FILE_SYNC_SOURCE=true
 
 # valid values for RANGER_DB_TYPE: mysql/postgres/oracle
 
-docker compose -f docker-compose.ranger.yml -f 
docker-compose.ranger-usersync.yml -f docker-compose.ranger-tagsync.yml -f 
docker-compose.ranger-kms.yml up -d
+docker compose -f docker-compose.ranger.yml -f 
docker-compose.ranger-usersync.yml -f docker-compose.ranger-tagsync.yml -f 
docker-compose.ranger-pdp.xml -f docker-compose.ranger-kms.yml up -d

Review Comment:
   The docker compose command references `docker-compose.ranger-pdp.xml`, but 
the added file is `docker-compose.ranger-pdp.yml`. This typo will cause the 
documented command to fail. Update the filename/extension in the README to 
match the actual compose file.
   ```suggestion
   docker compose -f docker-compose.ranger.yml -f 
docker-compose.ranger-usersync.yml -f docker-compose.ranger-tagsync.yml -f 
docker-compose.ranger-pdp.yml -f docker-compose.ranger-kms.yml up -d
   ```



##########
pdp/src/main/java/org/apache/ranger/pdp/config/RangerPdpConstants.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.pdp.config;
+
+public final class RangerPdpConstants {
+    private RangerPdpConstants() {
+        // no instances
+    }
+
+    // Servlet context attributes
+    public static final String SERVLET_CTX_ATTR_CONFIG        = 
"ranger.pdp.config";
+    public static final String SERVLET_CTX_ATTR_AUTHORIZER    = 
"ranger.pdp.authorizer";
+    public static final String SERVLET_CTX_ATTR_RUNTIME_STATE = 
"ranger.pdp.runtime.state";
+
+    // Request attributes set by auth filter
+    public static final String ATTR_AUTHENTICATED_USER = 
"ranger.pdp.authenticated.user";
+    public static final String ATTR_AUTH_TYPE          = 
"ranger.pdp.auth.type";
+
+    // Server
+    public static final String PROP_CONF_DIR = "ranger.pdp.conf.dir";
+    public static final String PROP_PORT    = "ranger.pdp.port";
+    public static final String PROP_LOG_DIR = "ranger.pdp.log.dir";
+
+    // SSL/TLS
+    public static final String PROP_SSL_ENABLED             = 
"ranger.pdp.ssl.enabled";
+    public static final String PROP_SSL_KEYSTORE_FILE       = 
"ranger.pdp.ssl.keystore.file";
+    public static final String PROP_SSL_KEYSTORE_PASSWORD   = 
"ranger.pdp.ssl.keystore.password";
+    public static final String PROP_SSL_KEYSTORE_TYPE       = 
"ranger.pdp.ssl.keystore.type";
+    public static final String PROP_SSL_TRUSTSTORE_ENABLED  = 
"ranger.pdp.ssl.truststore.enabled";
+    public static final String PROP_SSL_TRUSTSTORE_FILE     = 
"ranger.pdp.ssl.truststore.file";
+    public static final String PROP_SSL_TRUSTSTORE_PASSWORD = 
"ranger.pdp.ssl.truststore.password";
+    public static final String PROP_SSL_TRUSTSTORE_TYPE     = 
"ranger.pdp.ssl.truststore.type";
+
+    // HTTP/2
+    public static final String PROP_HTTP2_ENABLED = "ranger.pdp.http2.enabled";
+
+    // HTTP connector limits
+    public static final String PROP_HTTP_CONNECTOR_MAX_THREADS       = 
"ranger.pdp.http.connector.maxThreads";
+    public static final String PROP_HTTP_CONNECTOR_MIN_SPARE_THREADS = 
"ranger.pdp.http.connector.minSpareThreads";
+    public static final String PROP_HTTP_CONNECTOR_ACCEPT_COUNT      = 
"ranger.pdp.http.connector.acceptCount";
+    public static final String PROP_HTTP_CONNECTOR_MAX_CONNECTIONS   = 
"ranger.pdp.http.connector.maxConnections";
+
+    // Authentication types
+    public static final String PROP_AUTH_TYPES = "ranger.pdp.auth.types";
+
+    // HTTP header auth
+    public static final String PROP_HEADER_AUTHN_ENABLED  = 
"ranger.pdp.authn.header.enabled";
+    public static final String PROP_HEADER_AUTHN_USERNAME = 
"ranger.pdp.authn.header.username";
+
+    // JWT auth
+    public static final String PROP_JWT_PROVIDER_URL = 
"ranger.pdp.jwt.provider.url";
+    public static final String PROP_JWT_PUBLIC_KEY   = 
"ranger.pdp.jwt.public.key";
+    public static final String PROP_JWT_COOKIE_NAME  = 
"ranger.pdp.jwt.cookie.name";
+    public static final String PROP_JWT_AUDIENCES    = 
"ranger.pdp.jwt.audiences";
+
+    // Kerberos/SPNEGO auth
+    public static final String PROP_SPNEGO_PRINCIPAL   = 
"ranger.pdp.kerberos.spnego.principal";
+    public static final String PROP_SPNEGO_KEYTAB      = 
"ranger.pdp.kerberos.spnego.keytab";
+    public static final String PROP_KRB_NAME_RULES     = 
"hadoop.security.auth_to_local";
+    public static final String PROP_KRB_TOKEN_VALIDITY = 
"ranger.pdp.kerberos.token.valid.seconds";
+    public static final String PROP_KRB_COOKIE_DOMAIN  = 
"ranger.pdp.kerberos.cookie.domain";
+    public static final String PROP_KRB_COOKIE_PATH    = 
"ranger.pdp.kerberos.cookie.path";
+
+    // Authorizer/audit properties referenced by PDP code
+    public static final String PROP_AUTHZ_POLICY_CACHE_DIR = 
"ranger.authz.default.policy.cache.dir";
+    public static final String PROP_AUTHZ_PREFIX           = "ranger.authz.";
+    public static final String PROP_PDP_PREFIX             = "ranger.pdp.";
+    public static final String PROP_PDP_SERVICE_PREFIX     = PROP_PDP_PREFIX + 
"service.";
+    public static final String PROP_SPNEGO_PREFIX          = "ranger.spnego.";
+    public static final String PROP_HADOOP_SECURITY_PREFIX = 
"hadoop.security.";
+
+    // delegation users
+    public static final String PROP_SUFFIX_DELEGATION_USERS = 
".delegation.users";
+    public static final String WILDCARD_SERVICE_NAME        = "*";

Review Comment:
   `PROP_SUFFIX_DELEGATION_USERS` is defined as `.delegation.users`, but the 
default config templates and REST Javadoc refer to `.allowed.users`. This 
mismatch will prevent delegation/allowed-caller config from being loaded unless 
users guess the correct suffix. Consider renaming this constant (and related 
variable names) or switching the suffix to `.allowed.users` to match the 
documented config keys (and update docker configs/tests accordingly).



##########
pdp/src/main/resources/ranger-pdp-default.xml:
##########
@@ -0,0 +1,392 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<configuration>
+  <property>
+    <name>ranger.pdp.port</name>
+    <value>6500</value>
+    <description>Port the PDP server listens on.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.log.dir</name>
+    <value>/var/log/ranger/pdp</value>
+    <description>Directory for PDP server log files.</description>
+  </property>
+
+  <!-- SSL/TLS -->
+  <property>
+    <name>ranger.pdp.ssl.enabled</name>
+    <value>false</value>
+    <description>Set to true to enable HTTPS.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.keystore.file</name>
+    <value/>
+    <description>Path to the keystore file (required when SSL is 
enabled).</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.keystore.password</name>
+    <value/>
+    <description>Keystore password.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.keystore.type</name>
+    <value>JKS</value>
+    <description>Keystore type (JKS, PKCS12).</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.truststore.enabled</name>
+    <value>false</value>
+    <description>Set to true to require client certificate 
authentication.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.truststore.file</name>
+    <value/>
+    <description>Path to the truststore file.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.truststore.password</name>
+    <value/>
+    <description>Truststore password.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.truststore.type</name>
+    <value>JKS</value>
+    <description>Truststore type (JKS, PKCS12).</description>
+  </property>
+
+  <!-- HTTP/2 -->
+  <property>
+    <name>ranger.pdp.http2.enabled</name>
+    <value>true</value>
+    <description>
+      Enable HTTP/2 via upgrade on the connector.
+      Supports both h2 (over TLS) and h2c (cleartext upgrade) alongside 
HTTP/1.1.
+    </description>
+  </property>
+
+  <!-- Connector concurrency / queue limits -->
+  <property>
+    <name>ranger.pdp.http.connector.maxThreads</name>
+    <value>200</value>
+    <description>Maximum number of worker threads handling simultaneous 
requests.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.http.connector.minSpareThreads</name>
+    <value>20</value>
+    <description>Minimum number of spare worker threads kept 
ready.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.http.connector.acceptCount</name>
+    <value>100</value>
+    <description>Queued connection backlog when all worker threads are 
busy.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.http.connector.maxConnections</name>
+    <value>10000</value>
+    <description>Maximum concurrent TCP connections accepted by the 
connector.</description>
+  </property>
+
+  <!-- ================================================================
+       Authentication for incoming REST requests
+       Comma-separated list of methods tried in the listed order.
+       Supported values: header, jwt, kerberos
+       ================================================================ -->
+  <property>
+    <name>ranger.pdp.auth.types</name>
+    <value>header,jwt,kerberos</value>
+    <description>
+      Comma-separated list of authentication methods for incoming REST 
requests,
+      tried in listed order. Supported values: header, jwt, kerberos.
+    </description>
+  </property>

Review Comment:
   `ranger.pdp.auth.types` defaults to `header,jwt,kerberos`, but the default 
JWT/SPNEGO configuration values in this same file are empty. 
`JwtAuthHandler`/`KerberosAuthHandler` initialization will throw when required 
settings are missing, which prevents the server from starting with the 
out-of-the-box defaults. Consider changing the default auth types to only 
include handlers that can be initialized with the provided defaults (or provide 
non-empty defaults / skip handlers when unconfigured).



##########
pdp/src/main/java/org/apache/ranger/pdp/rest/RangerPdpREST.java:
##########
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.pdp.rest;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ranger.authz.api.RangerAuthorizer;
+import org.apache.ranger.authz.api.RangerAuthzException;
+import org.apache.ranger.authz.model.RangerAccessInfo;
+import org.apache.ranger.authz.model.RangerAuthzRequest;
+import org.apache.ranger.authz.model.RangerAuthzResult;
+import org.apache.ranger.authz.model.RangerMultiAuthzRequest;
+import org.apache.ranger.authz.model.RangerMultiAuthzResult;
+import org.apache.ranger.authz.model.RangerResourceInfo;
+import org.apache.ranger.authz.model.RangerResourcePermissions;
+import org.apache.ranger.authz.model.RangerResourcePermissionsRequest;
+import org.apache.ranger.authz.model.RangerUserInfo;
+import org.apache.ranger.pdp.RangerPdpStats;
+import org.apache.ranger.pdp.config.RangerPdpConfig;
+import org.apache.ranger.pdp.config.RangerPdpConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.ranger.pdp.config.RangerPdpConstants.PROP_PDP_SERVICE_PREFIX;
+import static 
org.apache.ranger.pdp.config.RangerPdpConstants.PROP_SUFFIX_DELEGATION_USERS;
+import static 
org.apache.ranger.pdp.config.RangerPdpConstants.WILDCARD_SERVICE_NAME;
+
+/**
+ * REST resource that exposes the three core {@link RangerAuthorizer} methods 
over HTTP.
+ *
+ * <p>All endpoints are under {@code /authz/v1} and produce/consume {@code 
application/json}.
+ * Authentication is enforced upstream by {@link RangerPdpAuthFilter}; the 
authenticated
+ * caller's identity is read from the {@link 
RangerPdpConstants#ATTR_AUTHENTICATED_USER}
+ * request attribute.
+ *
+ * <table border="1">
+ *   <tr><th>Method</th><th>Path</th><th>Request body</th><th>Response 
body</th></tr>
+ *   <tr><td>POST</td><td>/authz/v1/authorize</td>
+ *       <td>{@link RangerAuthzRequest}</td><td>{@link 
RangerAuthzResult}</td></tr>
+ *   <tr><td>POST</td><td>/authz/v1/authorizeMulti</td>
+ *       <td>{@link RangerMultiAuthzRequest}</td><td>{@link 
RangerMultiAuthzResult}</td></tr>
+ *   <tr><td>POST</td><td>/authz/v1/permissions</td>
+ *       <td>{@link RangerResourcePermissionsRequest}</td>
+ *       <td>{@link RangerResourcePermissions}</td></tr>
+ * </table>
+ */
+@Path("/v1")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+@Singleton
+public class RangerPdpREST {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RangerPdpREST.class);
+
+    private static final Response RESPONSE_OK = Response.ok().build();
+
+    private final Map<String, Set<String>> delegationUsersByService = new 
HashMap<>();
+
+    @Inject
+    private RangerAuthorizer authorizer;
+
+    @Inject
+    private RangerPdpConfig config;
+
+    @Context
+    private ServletContext servletContext;
+
+    @PostConstruct
+    public void initialize() {
+        initializeDelegationUsers();
+    }
+
+    /**
+     * Evaluates a single access request.
+     *
+     * @param request the authorization request
+     * @return {@code 200 OK} with {@link RangerAuthzResult}, or {@code 400} / 
{@code 500} on error
+     */
+    @POST
+    @Path("/authorize")
+    public Response authorize(RangerAuthzRequest request, @Context 
HttpServletRequest httpRequest) {
+        long     startNanos = System.nanoTime();
+        Response ret        = null;
+
+        try {
+            String           requestId   = request != null ? 
request.getRequestId() : null;
+            String           caller      = getAuthenticatedUser(httpRequest);
+            String           serviceName = getServiceName(request);
+            RangerUserInfo   user        = request != null ? request.getUser() 
: null;
+            RangerAccessInfo access      = request != null ? 
request.getAccess() : null;
+
+            LOG.debug("==> authorize(requestId={}, caller={}, 
serviceName={})", requestId, caller, serviceName);
+
+            ret = validateCaller(caller, user, access, serviceName);
+
+            if (RESPONSE_OK.equals(ret)) {
+                try {
+                    RangerAuthzResult result = authorizer.authorize(request);
+
+                    ret = Response.ok(result).build();
+                } catch (RangerAuthzException e) {
+                    LOG.warn("authorize(requestId={}): authorization error; 
caller={}", requestId, caller, e);
+
+                    ret = badRequest(e);
+                } catch (Exception e) {
+                    LOG.error("authorize(requestId={}): internal error; 
caller={}", requestId, caller, e);
+
+                    ret = serverError(e);
+                }
+            }
+
+            LOG.debug("<== authorize(requestId={}, caller={}, serviceName={}): 
ret={}", requestId, caller, serviceName, ret != null ? ret.getStatus() : null);
+        } finally {
+            recordRequestMetrics(ret, startNanos, httpRequest);
+        }
+
+        return ret;
+    }
+
+    /**
+     * Evaluates multiple access requests in a single call.
+     *
+     * @param request the multi-access authorization request
+     * @return {@code 200 OK} with {@link RangerMultiAuthzResult}, or {@code 
400} / {@code 500} on error
+     */
+    @POST
+    @Path("/authorizeMulti")
+    public Response authorizeMulti(RangerMultiAuthzRequest request, @Context 
HttpServletRequest httpRequest) {
+        long     startNanos = System.nanoTime();
+        Response ret        = null;
+
+        try {
+            String                 requestId   = request != null ? 
request.getRequestId() : null;
+            String                 caller      = 
getAuthenticatedUser(httpRequest);
+            String                 serviceName = getServiceName(request);
+            RangerUserInfo         user        = request != null ? 
request.getUser() : null;
+            List<RangerAccessInfo> accesses    = request != null ? 
request.getAccesses() : null;
+
+            LOG.debug("==> authorizeMulti(requestId={}, caller={}, 
serviceName={})", requestId, caller, serviceName);
+
+            ret = validateCaller(caller, user, accesses, serviceName);
+
+            if (RESPONSE_OK.equals(ret)) {
+                try {
+                    RangerMultiAuthzResult result = 
authorizer.authorize(request);
+
+                    ret = Response.ok(result).build();
+                } catch (RangerAuthzException e) {
+                    LOG.warn("authorizeMulti(requestId={}): authorization 
error; caller={}", requestId, caller, e);
+
+                    ret = badRequest(e);
+                } catch (Exception e) {
+                    LOG.error("authorizeMulti(requestId={}): internal error; 
caller={}", requestId, caller, e);
+
+                    ret = serverError(e);
+                }
+            }
+
+            LOG.debug("<== authorizeMulti(requestId={}, caller={}, 
serviceName={}): ret={}", requestId, caller, serviceName, ret != null ? 
ret.getStatus() : null);
+        } finally {
+            recordRequestMetrics(ret, startNanos, httpRequest);
+        }
+
+        return ret;
+    }
+
+    /**
+     * Returns the effective permissions for a resource, broken down by 
user/group/role.
+     *
+     * @param request wrapper containing the resource info and access context
+     * @return {@code 200 OK} with {@link RangerResourcePermissions}, or 
{@code 400} / {@code 500} on error
+     */
+    @POST
+    @Path("/permissions")
+    public Response getResourcePermissions(RangerResourcePermissionsRequest 
request, @Context HttpServletRequest httpRequest) {
+        long     startNanos = System.nanoTime();
+        Response ret        = null;
+
+        try {
+            String caller      = getAuthenticatedUser(httpRequest);
+            String serviceName = getServiceName(request);
+
+            LOG.debug("==> getResourcePermissions(caller={}, serviceName={})", 
caller, serviceName);
+
+            ret = validateCaller(caller, serviceName);
+
+            if (RESPONSE_OK.equals(ret)) {
+                try {
+                    RangerResourcePermissions result = 
authorizer.getResourcePermissions(request);
+
+                    ret = Response.ok(result).build();
+                } catch (RangerAuthzException e) {
+                    LOG.warn("getResourcePermissions(): validation error; 
caller={}", caller, e);
+
+                    ret = badRequest(e);
+                } catch (Exception e) {
+                    LOG.error("getResourcePermissions(): unexpected error; 
caller={}", caller, e);
+
+                    ret = serverError(e);
+                }
+            }
+
+            LOG.debug("==> getResourcePermissions(caller={}, serviceName={}): 
ret={}", caller, serviceName, ret != null ? ret.getStatus() : null);
+        } finally {

Review Comment:
   The exit log line uses the same `==>` prefix as the entry log 
(`LOG.debug("==> getResourcePermissions...")`). For consistency with the other 
methods (and to make logs easier to scan), this looks like it should be `<== 
getResourcePermissions(...)`.



##########
pdp/conf.dist/ranger-pdp-site.xml:
##########
@@ -0,0 +1,299 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!-- Copy this file to conf/ranger-pdp-site.xml and edit to override defaults. 
-->
+
+<configuration>
+
+  <!-- ================================================================
+       PDP Server
+       ================================================================ -->
+
+  <property>
+    <name>ranger.pdp.port</name>
+    <value>6500</value>
+  </property>
+
+  <property>
+    <name>ranger.pdp.log.dir</name>
+    <value>/var/log/ranger/pdp</value>
+  </property>
+
+  <!-- ================================================================
+       SSL/TLS
+       ================================================================ -->
+
+  <property>
+    <name>ranger.pdp.ssl.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.keystore.file</name>
+    <value/>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.keystore.password</name>
+    <value/>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.keystore.type</name>
+    <value>JKS</value>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.truststore.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.truststore.file</name>
+    <value/>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.truststore.password</name>
+    <value/>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.truststore.type</name>
+    <value>JKS</value>
+  </property>
+
+  <!-- ================================================================
+       HTTP/2
+       ================================================================ -->
+
+  <property>
+    <name>ranger.pdp.http2.enabled</name>
+    <value>true</value>
+  </property>
+
+  <!-- Connector concurrency / queue limits -->
+  <property>
+    <name>ranger.pdp.http.connector.maxThreads</name>
+    <value>200</value>
+  </property>
+
+  <property>
+    <name>ranger.pdp.http.connector.minSpareThreads</name>
+    <value>20</value>
+  </property>
+
+  <property>
+    <name>ranger.pdp.http.connector.acceptCount</name>
+    <value>100</value>
+  </property>
+
+  <property>
+    <name>ranger.pdp.http.connector.maxConnections</name>
+    <value>10000</value>
+  </property>
+
+  <!-- ================================================================
+       Authentication for incoming REST requests
+       Comma-separated list of methods tried in listed order.
+       Supported values: header, jwt, kerberos
+       ================================================================ -->
+
+  <property>
+    <name>ranger.pdp.auth.types</name>
+    <value>header,jwt,kerberos</value>
+  </property>
+
+  <!-- HTTP Header authentication -->
+
+  <property>
+    <name>ranger.pdp.authn.header.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>ranger.pdp.authn.header.username</name>
+    <value>X-Forwarded-User</value>
+  </property>
+
+  <!-- JWT bearer token authentication -->
+
+  <property>
+    <name>ranger.pdp.jwt.provider.url</name>
+    <value/>
+  </property>
+
+  <property>
+    <name>ranger.pdp.jwt.public.key</name>
+    <value/>
+  </property>
+
+  <property>
+    <name>ranger.pdp.jwt.cookie.name</name>
+    <value>hadoop-jwt</value>
+  </property>
+
+  <property>
+    <name>ranger.pdp.jwt.audiences</name>
+    <value/>
+  </property>
+
+  <!-- Kerberos / SPNEGO authentication -->
+
+  <property>
+    <name>ranger.pdp.kerberos.spnego.principal</name>
+    <value/>
+  </property>
+
+  <property>
+    <name>ranger.pdp.kerberos.spnego.keytab</name>
+    <value/>
+  </property>
+
+  <property>
+    <name>hadoop.security.auth_to_local</name>
+    <value>DEFAULT</value>
+  </property>
+
+  <property>
+    <name>ranger.pdp.kerberos.token.valid.seconds</name>
+    <value>30</value>
+  </property>
+
+  <property>
+    <name>ranger.pdp.kerberos.cookie.domain</name>
+    <value/>
+  </property>
+
+  <property>
+    <name>ranger.pdp.kerberos.cookie.path</name>
+    <value>/</value>
+  </property>
+
+  <!--
+    Allowed callers for PDP REST APIs, per serviceName in request context.
+    Format:
+      ranger.pdp.service.<serviceName>.allowed.users=user1,user2
+      ranger.pdp.service.*.allowed.users=user3,user4
+    If no such entries are configured, this validation is disabled.
+  -->
+  <property>
+    <name>ranger.pdp.service.*.allowed.users</name>

Review Comment:
   The conf.dist template describes allowed-caller keys as 
`ranger.pdp.service.*.allowed.users`, but the PDP implementation currently 
loads keys ending in `.delegation.users`. As-is, users copying this template 
into `conf/` will configure a property that PDP never reads. Align the template 
property name(s) with the code/constants (or vice versa).
   ```suggestion
         ranger.pdp.service.<serviceName>.delegation.users=user1,user2
         ranger.pdp.service.*.delegation.users=user3,user4
       If no such entries are configured, this validation is disabled.
     -->
     <property>
       <name>ranger.pdp.service.*.delegation.users</name>
   ```



##########
pdp/src/main/resources/ranger-pdp-default.xml:
##########
@@ -0,0 +1,392 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<configuration>
+  <property>
+    <name>ranger.pdp.port</name>
+    <value>6500</value>
+    <description>Port the PDP server listens on.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.log.dir</name>
+    <value>/var/log/ranger/pdp</value>
+    <description>Directory for PDP server log files.</description>
+  </property>
+
+  <!-- SSL/TLS -->
+  <property>
+    <name>ranger.pdp.ssl.enabled</name>
+    <value>false</value>
+    <description>Set to true to enable HTTPS.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.keystore.file</name>
+    <value/>
+    <description>Path to the keystore file (required when SSL is 
enabled).</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.keystore.password</name>
+    <value/>
+    <description>Keystore password.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.keystore.type</name>
+    <value>JKS</value>
+    <description>Keystore type (JKS, PKCS12).</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.truststore.enabled</name>
+    <value>false</value>
+    <description>Set to true to require client certificate 
authentication.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.truststore.file</name>
+    <value/>
+    <description>Path to the truststore file.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.truststore.password</name>
+    <value/>
+    <description>Truststore password.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.ssl.truststore.type</name>
+    <value>JKS</value>
+    <description>Truststore type (JKS, PKCS12).</description>
+  </property>
+
+  <!-- HTTP/2 -->
+  <property>
+    <name>ranger.pdp.http2.enabled</name>
+    <value>true</value>
+    <description>
+      Enable HTTP/2 via upgrade on the connector.
+      Supports both h2 (over TLS) and h2c (cleartext upgrade) alongside 
HTTP/1.1.
+    </description>
+  </property>
+
+  <!-- Connector concurrency / queue limits -->
+  <property>
+    <name>ranger.pdp.http.connector.maxThreads</name>
+    <value>200</value>
+    <description>Maximum number of worker threads handling simultaneous 
requests.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.http.connector.minSpareThreads</name>
+    <value>20</value>
+    <description>Minimum number of spare worker threads kept 
ready.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.http.connector.acceptCount</name>
+    <value>100</value>
+    <description>Queued connection backlog when all worker threads are 
busy.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.http.connector.maxConnections</name>
+    <value>10000</value>
+    <description>Maximum concurrent TCP connections accepted by the 
connector.</description>
+  </property>
+
+  <!-- ================================================================
+       Authentication for incoming REST requests
+       Comma-separated list of methods tried in the listed order.
+       Supported values: header, jwt, kerberos
+       ================================================================ -->
+  <property>
+    <name>ranger.pdp.auth.types</name>
+    <value>header,jwt,kerberos</value>
+    <description>
+      Comma-separated list of authentication methods for incoming REST 
requests,
+      tried in listed order. Supported values: header, jwt, kerberos.
+    </description>
+  </property>
+
+  <!-- HTTP Header authentication
+       Consistent with: ranger.admin.authn.header.* in security-admin -->
+  <property>
+    <name>ranger.pdp.authn.header.enabled</name>
+    <value>false</value>
+    <description>
+      Enable trusted HTTP header authentication. Use only behind a trusted 
proxy.
+    </description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.authn.header.username</name>
+    <value>X-Forwarded-User</value>
+    <description>HTTP header name from which the authenticated username is 
read.</description>
+  </property>
+
+  <!-- JWT bearer token authentication
+       PDP validates tokens presented as "Authorization: Bearer token" or via 
cookie.
+       This is distinct from Knox SSO (browser redirect flow); no SSO redirect 
is performed. -->
+  <property>
+    <name>ranger.pdp.jwt.provider.url</name>
+    <value/>
+    <description>URL of the JWT provider (used to fetch the public 
key).</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.jwt.public.key</name>
+    <value/>
+    <description>Base64-encoded public key for verifying JWT 
signatures.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.jwt.cookie.name</name>
+    <value>hadoop-jwt</value>
+    <description>Cookie name from which a JWT bearer token may be 
read.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.jwt.audiences</name>
+    <value/>
+    <description>Comma-separated list of accepted JWT audiences. Empty means 
any audience is accepted.</description>
+  </property>
+
+  <!-- Kerberos / SPNEGO authentication for incoming REST requests -->
+  <property>
+    <name>ranger.pdp.kerberos.spnego.principal</name>
+    <value/>
+    <description>Kerberos service principal for SPNEGO authentication, e.g. 
HTTP/host@REALM.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.kerberos.spnego.keytab</name>
+    <value/>
+    <description>Path to the keytab file for the SPNEGO service 
principal.</description>
+  </property>
+
+  <property>
+    <name>hadoop.security.auth_to_local</name>
+    <value>DEFAULT</value>
+    <description>Rules for mapping Kerberos principal names to short 
usernames.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.kerberos.token.valid.seconds</name>
+    <value>30</value>
+    <description>Validity period (seconds) for Kerberos authentication 
tokens.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.kerberos.cookie.domain</name>
+    <value/>
+    <description>Cookie domain used for the Kerberos authentication 
cookie.</description>
+  </property>
+
+  <property>
+    <name>ranger.pdp.kerberos.cookie.path</name>
+    <value>/</value>
+    <description>Cookie path used for the Kerberos authentication 
cookie.</description>
+  </property>
+
+  <!--
+    Allowed callers for PDP REST APIs, per serviceName in request context.
+    Format:
+      ranger.pdp.service.<serviceName>.allowed.users=user1,user2
+      ranger.pdp.service.*.allowed.users=user3,user4
+    If no such entries are configured, this validation is disabled.
+  -->
+  <property>
+    <name>ranger.pdp.service.*.allowed.users</name>
+    <value/>
+    <description>Comma-separated users allowed to invoke PDP APIs for all 
services.</description>
+  </property>

Review Comment:
   The default config documents the allowed-caller keys as 
`ranger.pdp.service.*.allowed.users`, but the PDP code currently reads 
`ranger.pdp.service.<service>.delegation.users` (suffix `.delegation.users`). 
As written, these defaults won't be picked up by 
`RangerPdpREST.initializeDelegationUsers()`. Update the property name(s) here 
(or update the code/constants) so the shipped defaults and the implementation 
agree.



##########
pdp/src/main/java/org/apache/ranger/pdp/rest/RangerPdpREST.java:
##########
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.pdp.rest;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ranger.authz.api.RangerAuthorizer;
+import org.apache.ranger.authz.api.RangerAuthzException;
+import org.apache.ranger.authz.model.RangerAccessInfo;
+import org.apache.ranger.authz.model.RangerAuthzRequest;
+import org.apache.ranger.authz.model.RangerAuthzResult;
+import org.apache.ranger.authz.model.RangerMultiAuthzRequest;
+import org.apache.ranger.authz.model.RangerMultiAuthzResult;
+import org.apache.ranger.authz.model.RangerResourceInfo;
+import org.apache.ranger.authz.model.RangerResourcePermissions;
+import org.apache.ranger.authz.model.RangerResourcePermissionsRequest;
+import org.apache.ranger.authz.model.RangerUserInfo;
+import org.apache.ranger.pdp.RangerPdpStats;
+import org.apache.ranger.pdp.config.RangerPdpConfig;
+import org.apache.ranger.pdp.config.RangerPdpConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.ranger.pdp.config.RangerPdpConstants.PROP_PDP_SERVICE_PREFIX;
+import static 
org.apache.ranger.pdp.config.RangerPdpConstants.PROP_SUFFIX_DELEGATION_USERS;
+import static 
org.apache.ranger.pdp.config.RangerPdpConstants.WILDCARD_SERVICE_NAME;
+
+/**
+ * REST resource that exposes the three core {@link RangerAuthorizer} methods 
over HTTP.
+ *
+ * <p>All endpoints are under {@code /authz/v1} and produce/consume {@code 
application/json}.
+ * Authentication is enforced upstream by {@link RangerPdpAuthFilter}; the 
authenticated
+ * caller's identity is read from the {@link 
RangerPdpConstants#ATTR_AUTHENTICATED_USER}
+ * request attribute.
+ *
+ * <table border="1">
+ *   <tr><th>Method</th><th>Path</th><th>Request body</th><th>Response 
body</th></tr>
+ *   <tr><td>POST</td><td>/authz/v1/authorize</td>
+ *       <td>{@link RangerAuthzRequest}</td><td>{@link 
RangerAuthzResult}</td></tr>
+ *   <tr><td>POST</td><td>/authz/v1/authorizeMulti</td>
+ *       <td>{@link RangerMultiAuthzRequest}</td><td>{@link 
RangerMultiAuthzResult}</td></tr>
+ *   <tr><td>POST</td><td>/authz/v1/permissions</td>
+ *       <td>{@link RangerResourcePermissionsRequest}</td>
+ *       <td>{@link RangerResourcePermissions}</td></tr>
+ * </table>
+ */
+@Path("/v1")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+@Singleton
+public class RangerPdpREST {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RangerPdpREST.class);
+
+    private static final Response RESPONSE_OK = Response.ok().build();
+
+    private final Map<String, Set<String>> delegationUsersByService = new 
HashMap<>();
+
+    @Inject
+    private RangerAuthorizer authorizer;
+
+    @Inject
+    private RangerPdpConfig config;
+
+    @Context
+    private ServletContext servletContext;
+
+    @PostConstruct
+    public void initialize() {
+        initializeDelegationUsers();
+    }
+
+    /**
+     * Evaluates a single access request.
+     *
+     * @param request the authorization request
+     * @return {@code 200 OK} with {@link RangerAuthzResult}, or {@code 400} / 
{@code 500} on error
+     */
+    @POST
+    @Path("/authorize")
+    public Response authorize(RangerAuthzRequest request, @Context 
HttpServletRequest httpRequest) {
+        long     startNanos = System.nanoTime();
+        Response ret        = null;
+
+        try {
+            String           requestId   = request != null ? 
request.getRequestId() : null;
+            String           caller      = getAuthenticatedUser(httpRequest);
+            String           serviceName = getServiceName(request);
+            RangerUserInfo   user        = request != null ? request.getUser() 
: null;
+            RangerAccessInfo access      = request != null ? 
request.getAccess() : null;
+
+            LOG.debug("==> authorize(requestId={}, caller={}, 
serviceName={})", requestId, caller, serviceName);
+
+            ret = validateCaller(caller, user, access, serviceName);
+
+            if (RESPONSE_OK.equals(ret)) {
+                try {
+                    RangerAuthzResult result = authorizer.authorize(request);
+
+                    ret = Response.ok(result).build();
+                } catch (RangerAuthzException e) {
+                    LOG.warn("authorize(requestId={}): authorization error; 
caller={}", requestId, caller, e);
+
+                    ret = badRequest(e);
+                } catch (Exception e) {
+                    LOG.error("authorize(requestId={}): internal error; 
caller={}", requestId, caller, e);
+
+                    ret = serverError(e);
+                }
+            }
+
+            LOG.debug("<== authorize(requestId={}, caller={}, serviceName={}): 
ret={}", requestId, caller, serviceName, ret != null ? ret.getStatus() : null);
+        } finally {
+            recordRequestMetrics(ret, startNanos, httpRequest);
+        }
+
+        return ret;
+    }
+
+    /**
+     * Evaluates multiple access requests in a single call.
+     *
+     * @param request the multi-access authorization request
+     * @return {@code 200 OK} with {@link RangerMultiAuthzResult}, or {@code 
400} / {@code 500} on error
+     */
+    @POST
+    @Path("/authorizeMulti")
+    public Response authorizeMulti(RangerMultiAuthzRequest request, @Context 
HttpServletRequest httpRequest) {
+        long     startNanos = System.nanoTime();
+        Response ret        = null;
+
+        try {
+            String                 requestId   = request != null ? 
request.getRequestId() : null;
+            String                 caller      = 
getAuthenticatedUser(httpRequest);
+            String                 serviceName = getServiceName(request);
+            RangerUserInfo         user        = request != null ? 
request.getUser() : null;
+            List<RangerAccessInfo> accesses    = request != null ? 
request.getAccesses() : null;
+
+            LOG.debug("==> authorizeMulti(requestId={}, caller={}, 
serviceName={})", requestId, caller, serviceName);
+
+            ret = validateCaller(caller, user, accesses, serviceName);
+
+            if (RESPONSE_OK.equals(ret)) {
+                try {
+                    RangerMultiAuthzResult result = 
authorizer.authorize(request);
+
+                    ret = Response.ok(result).build();
+                } catch (RangerAuthzException e) {
+                    LOG.warn("authorizeMulti(requestId={}): authorization 
error; caller={}", requestId, caller, e);
+
+                    ret = badRequest(e);
+                } catch (Exception e) {
+                    LOG.error("authorizeMulti(requestId={}): internal error; 
caller={}", requestId, caller, e);
+
+                    ret = serverError(e);
+                }
+            }
+
+            LOG.debug("<== authorizeMulti(requestId={}, caller={}, 
serviceName={}): ret={}", requestId, caller, serviceName, ret != null ? 
ret.getStatus() : null);
+        } finally {
+            recordRequestMetrics(ret, startNanos, httpRequest);
+        }
+
+        return ret;
+    }
+
+    /**
+     * Returns the effective permissions for a resource, broken down by 
user/group/role.
+     *
+     * @param request wrapper containing the resource info and access context
+     * @return {@code 200 OK} with {@link RangerResourcePermissions}, or 
{@code 400} / {@code 500} on error
+     */
+    @POST
+    @Path("/permissions")
+    public Response getResourcePermissions(RangerResourcePermissionsRequest 
request, @Context HttpServletRequest httpRequest) {
+        long     startNanos = System.nanoTime();
+        Response ret        = null;
+
+        try {
+            String caller      = getAuthenticatedUser(httpRequest);
+            String serviceName = getServiceName(request);
+
+            LOG.debug("==> getResourcePermissions(caller={}, serviceName={})", 
caller, serviceName);
+
+            ret = validateCaller(caller, serviceName);
+
+            if (RESPONSE_OK.equals(ret)) {
+                try {
+                    RangerResourcePermissions result = 
authorizer.getResourcePermissions(request);
+
+                    ret = Response.ok(result).build();
+                } catch (RangerAuthzException e) {
+                    LOG.warn("getResourcePermissions(): validation error; 
caller={}", caller, e);
+
+                    ret = badRequest(e);
+                } catch (Exception e) {
+                    LOG.error("getResourcePermissions(): unexpected error; 
caller={}", caller, e);
+
+                    ret = serverError(e);
+                }
+            }
+
+            LOG.debug("==> getResourcePermissions(caller={}, serviceName={}): 
ret={}", caller, serviceName, ret != null ? ret.getStatus() : null);
+        } finally {
+            recordRequestMetrics(ret, startNanos, httpRequest);
+        }
+
+        return ret;
+    }
+
+    private String getAuthenticatedUser(HttpServletRequest httpRequest) {
+        Object user = 
httpRequest.getAttribute(RangerPdpConstants.ATTR_AUTHENTICATED_USER);
+
+        return user != null ? user.toString() : null;
+    }
+
+    private static String getServiceName(RangerAuthzRequest request) {
+        return request != null && request.getContext() != null ? 
request.getContext().getServiceName() : null;
+    }
+
+    private static String getServiceName(RangerMultiAuthzRequest request) {
+        return request != null && request.getContext() != null ? 
request.getContext().getServiceName() : null;
+    }
+
+    private static String getServiceName(RangerResourcePermissionsRequest 
request) {
+        return request != null && request.getContext() != null ? 
request.getContext().getServiceName() : null;
+    }
+
+    private Response validateCaller(String caller, RangerUserInfo user, 
RangerAccessInfo access, String serviceName) {
+        final Response ret;
+
+        if (StringUtils.isBlank(caller)) {
+            ret = Response.status(Response.Status.UNAUTHORIZED)
+                    .entity(new ErrorResponse(Response.Status.UNAUTHORIZED, 
"Authentication required"))
+                    .build();
+        } else {
+            boolean needsDelegation = isDelegationNeeded(caller, user) || 
isDelegationNeeded(access);
+
+            if (needsDelegation) {
+                if (!isDelegationUserForService(serviceName, caller)) {
+                    LOG.info("{} is not a delegation user in service {}", 
caller, serviceName);
+
+                    ret = Response.status(Response.Status.FORBIDDEN)
+                            .entity(new 
ErrorResponse(Response.Status.FORBIDDEN, caller + " is not authorized"))
+                            .build();
+                } else {
+                    ret = RESPONSE_OK;
+                }
+            } else {
+                ret = RESPONSE_OK;
+            }
+        }
+
+        return ret;
+    }
+
+    private Response validateCaller(String caller, RangerUserInfo user, 
List<RangerAccessInfo> accesses, String serviceName) {
+        final Response ret;
+
+        if (StringUtils.isBlank(caller)) {
+            ret = Response.status(Response.Status.UNAUTHORIZED)
+                    .entity(new ErrorResponse(Response.Status.UNAUTHORIZED, 
"Authentication required"))
+                    .build();
+        } else {
+            boolean needsDelegation = isDelegationNeeded(caller, user) || 
isDelegationNeeded(accesses);
+
+            if (needsDelegation) {
+                if (!isDelegationUserForService(serviceName, caller)) {
+                    LOG.info("{} is not a delegation user in service {}", 
caller, serviceName);
+
+                    ret = Response.status(Response.Status.FORBIDDEN)
+                            .entity(new 
ErrorResponse(Response.Status.FORBIDDEN, caller + " is not authorized"))
+                            .build();
+                } else {
+                    ret = RESPONSE_OK;
+                }
+            } else {
+                ret = RESPONSE_OK;
+            }
+        }
+
+        return ret;
+    }
+
+    private Response validateCaller(String caller, String serviceName) {
+        final Response ret;
+
+        if (StringUtils.isBlank(caller)) {
+            ret = Response.status(Response.Status.UNAUTHORIZED)
+                    .entity(new ErrorResponse(Response.Status.UNAUTHORIZED, 
"Authentication required"))
+                    .build();
+        } else if (!isDelegationUserForService(serviceName, caller)) {
+            LOG.info("{} is not a delegation user in service {}", caller, 
serviceName);
+
+            ret = Response.status(Response.Status.FORBIDDEN)
+                    .entity(new ErrorResponse(Response.Status.FORBIDDEN, 
caller + " is not authorized"))
+                    .build();
+        } else {
+            ret = RESPONSE_OK;
+        }
+
+        return ret;
+    }
+
+    private boolean isDelegationNeeded(String caller, RangerUserInfo user) {
+        String  userName        = user != null ? user.getName() : null;
+        boolean needsDelegation = !caller.equals(userName);
+
+        if (!needsDelegation) {
+            // don't trust user-attributes/groups/roles if caller doesn't have 
delegation permission
+            needsDelegation = MapUtils.isNotEmpty(user.getAttributes()) || 
CollectionUtils.isNotEmpty(user.getGroups()) || 
CollectionUtils.isNotEmpty(user.getRoles());
+        }
+
+        return needsDelegation;
+    }
+
+    private boolean isDelegationNeeded(RangerAccessInfo access) {
+        RangerResourceInfo resource = access != null ? access.getResource() : 
null;
+
+        // delegation permission is needed when resource attributes are 
specified
+        return (resource != null && 
MapUtils.isNotEmpty(resource.getAttributes()));
+    }
+
+    private boolean isDelegationNeeded(List<RangerAccessInfo> accesses) {
+        if (accesses != null) {
+            for (RangerAccessInfo access : accesses) {
+                if (isDelegationNeeded(access)) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+    private RangerPdpStats getRuntimeState(HttpServletRequest httpRequest) {
+        Object state = 
httpRequest.getServletContext().getAttribute(RangerPdpConstants.SERVLET_CTX_ATTR_RUNTIME_STATE);
+
+        if (state instanceof RangerPdpStats) {
+            return (RangerPdpStats) state;
+        } else {
+            Object fallback = servletContext != null ? 
servletContext.getAttribute(RangerPdpConstants.SERVLET_CTX_ATTR_RUNTIME_STATE) 
: null;
+
+            return (fallback instanceof RangerPdpStats) ? (RangerPdpStats) 
fallback : new RangerPdpStats();
+        }
+    }
+
+    private void recordRequestMetrics(Response ret, long startNanos, 
HttpServletRequest httpRequest) {
+        RangerPdpStats state   = getRuntimeState(httpRequest);
+        int            status  = ret != null ? ret.getStatus() : 500;
+        long           elapsed = System.nanoTime() - startNanos;
+
+        if (status >= 200 && status < 300) {
+            state.recordRequestSuccess(elapsed);
+        } else if (status == 401 || status == 403) { // UNAUTHORIZED or 
FORBIDDEN
+            state.recordAuthFailure();
+        } else if (status == 400) {
+            state.recordRequestBadRequest(elapsed);
+        } else {
+            state.recordRequestError(elapsed);
+        }
+    }
+
+    /**
+     * Allowed-users config format:
+     *  ranger.pdp.service.<serviceName>.allowed.users=user1,user2
+     *  ranger.pdp.service.*.allowed.users=user3,user4
+     *
+     * If no allowed-users entries are configured, this check is disabled
+     * (backward-compatible behavior).

Review Comment:
   The Javadoc for the allowed/authorized callers uses the key suffix 
`.allowed.users`, but `initializeDelegationUsers()` actually loads keys ending 
with `RangerPdpConstants.PROP_SUFFIX_DELEGATION_USERS` (currently 
`.delegation.users`). With the current mismatch, users following the docs will 
configure `...allowed.users` and PDP will behave as if no users are configured. 
Align the documentation (and ideally naming) with the actual config keys used 
by the code, or change the constant/suffix parsing to match the documented 
property name.



##########
intg/src/main/python/apache_ranger/client/ranger_pdp_client.py:
##########
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from apache_ranger.client.ranger_client import RangerClientHttp
+from apache_ranger.model.ranger_authz   import RangerAuthzRequest, 
RangerAuthzResult
+from apache_ranger.model.ranger_authz   import RangerMultiAuthzRequest, 
RangerMultiAuthzResult
+from apache_ranger.model.ranger_authz   import RangerResourcePermissions, 
RangerResourcePermissionsRequest
+from apache_ranger.utils                import API, HttpMethod, HTTPStatus
+from apache_ranger.utils                import type_coerce
+
+LOG = logging.getLogger(__name__)
+
+
+class RangerPDPClient:
+    """
+    Python client for Ranger PDP authorization APIs.
+
+    Base URL should point to PDP server endpoint, for example:
+      http://localhost:6500
+    """
+
+    def __init__(self, url, auth, query_params=None, headers=None):
+        self.client_http = RangerClientHttp(url, auth, query_params, headers)
+        self.session     = self.client_http.session
+        logging.getLogger("requests").setLevel(logging.WARNING)
+
+    def authorize(self, authz_request):
+        """
+        Call POST /authz/v1/authorize
+        :param authz_request: dict-like or RangerAuthzRequest
+        :return: RangerAuthzResult
+        """
+        req  = type_coerce(authz_request, RangerAuthzRequest)
+        resp = self.client_http.call_api(RangerPDPClient.AUTHORIZE, 
request_data=req)
+        return type_coerce(resp, RangerAuthzResult)
+
+    def authorize_multi(self, multi_authz_request):
+        """
+        Call POST /authz/v1/authorizeMulti
+        :param multi_authz_request: dict-like or RangerMultiAuthzRequest
+        :return: RangerMultiAuthzResult
+        """
+        req  = type_coerce(multi_authz_request, RangerMultiAuthzRequest)
+        resp = self.client_http.call_api(RangerPDPClient.AUTHORIZE_MULTI, 
request_data=req)
+        return type_coerce(resp, RangerMultiAuthzResult)
+
+    def get_resource_permissions(self, resource, context=None):
+        """
+        Call POST /authz/v1/permissions
+        :param resource: dict-like resource payload OR 
RangerResourcePermissionsRequest
+        :param context: dict-like access context payload (ignored if 
'resource' is already request object)
+        :return: RangerResourcePermissions
+        """
+        req = type_coerce(resource, RangerResourcePermissionsRequest)
+        if req is None:
+            req = RangerResourcePermissionsRequest({
+                'resource': resource,
+                'context': context
+            })
+

Review Comment:
   `get_resource_permissions()` attempts to accept either a resource payload 
dict or a `RangerResourcePermissionsRequest`, but `type_coerce(resource, 
RangerResourcePermissionsRequest)` will coerce *any* dict into a 
`RangerResourcePermissionsRequest` (with `resource`/`context` likely `None`). 
This means passing a plain dict resource (as documented) results in an invalid 
request body. Consider detecting whether the dict already looks like a 
`RangerResourcePermissionsRequest` (e.g., contains `resource`/`context`) before 
coercing, or accept `RangerResourceInfo`/dict and always wrap it into a request 
object.



##########
pdp/src/main/java/org/apache/ranger/pdp/config/RangerPdpConfig.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.pdp.config;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Reads Ranger PDP configuration from {@code ranger-pdp-default.xml} 
(classpath)
+ * overridden by {@code ranger-pdp-site.xml} (classpath or filesystem).
+ *
+ * <p>Both files use the Hadoop {@code <configuration>} XML format, consistent
+ * with other Ranger server modules (tagsync, kms, etc.).
+ * The format is parsed directly using the JDK DOM API to avoid an early
+ * class-load dependency on Hadoop's {@code Configuration} class.
+ *
+ * <p>Authentication property names:
+ * <ul>
+ *   <li>Kerberos/SPNEGO:    {@code ranger.pdp.kerberos.spnego.*}
+ *   <li>JWT bearer token:   {@code ranger.pdp.jwt.*}
+ *   <li>HTTP header:        {@code ranger.pdp.authn.header.*}
+ * </ul>
+ */
+public class RangerPdpConfig {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RangerPdpConfig.class);
+
+    private static final String DEFAULT_CONFIG_FILE = "ranger-pdp-default.xml";
+    private static final String SITE_CONFIG_FILE    = "ranger-pdp-site.xml";
+
+    private final Properties props;
+
+    public RangerPdpConfig() {
+        props = new Properties();
+
+        loadFromClasspath(DEFAULT_CONFIG_FILE);
+        loadFromClasspath(SITE_CONFIG_FILE);
+
+        String confDir = System.getProperty(RangerPdpConstants.PROP_CONF_DIR, 
"");
+
+        if (StringUtils.isNotBlank(confDir)) {
+            loadFromFile(new File(confDir, SITE_CONFIG_FILE));
+        }
+
+        applySystemPropertyOverrides();
+
+        LOG.info("RangerPdpConfig initialized (conf.dir={})", confDir);
+    }
+
+    public int getPort() {
+        return getInt(RangerPdpConstants.PROP_PORT, 6500);
+    }
+
+    public String getLogDir() {
+        return get(RangerPdpConstants.PROP_LOG_DIR, "/var/log/ranger/pdp");
+    }
+
+    public boolean isSslEnabled() {
+        return getBoolean(RangerPdpConstants.PROP_SSL_ENABLED, false);
+    }
+
+    public String getKeystoreFile() {
+        return get(RangerPdpConstants.PROP_SSL_KEYSTORE_FILE, "");
+    }
+
+    public String getKeystorePassword() {
+        return get(RangerPdpConstants.PROP_SSL_KEYSTORE_PASSWORD, "");
+    }
+
+    public String getKeystoreType() {
+        return get(RangerPdpConstants.PROP_SSL_KEYSTORE_TYPE, "JKS");
+    }
+
+    public boolean isTruststoreEnabled() {
+        return getBoolean(RangerPdpConstants.PROP_SSL_TRUSTSTORE_ENABLED, 
false);
+    }
+
+    public String getTruststoreFile() {
+        return get(RangerPdpConstants.PROP_SSL_TRUSTSTORE_FILE, "");
+    }
+
+    public String getTruststorePassword() {
+        return get(RangerPdpConstants.PROP_SSL_TRUSTSTORE_PASSWORD, "");
+    }
+
+    public String getTruststoreType() {
+        return get(RangerPdpConstants.PROP_SSL_TRUSTSTORE_TYPE, "JKS");
+    }
+
+    public boolean isHttp2Enabled() {
+        return getBoolean(RangerPdpConstants.PROP_HTTP2_ENABLED, true);
+    }
+
+    public int getHttpConnectorMaxThreads() {
+        return getInt(RangerPdpConstants.PROP_HTTP_CONNECTOR_MAX_THREADS, 200);
+    }
+
+    public int getHttpConnectorMinSpareThreads() {
+        return 
getInt(RangerPdpConstants.PROP_HTTP_CONNECTOR_MIN_SPARE_THREADS, 20);
+    }
+
+    public int getHttpConnectorAcceptCount() {
+        return getInt(RangerPdpConstants.PROP_HTTP_CONNECTOR_ACCEPT_COUNT, 
100);
+    }
+
+    public int getHttpConnectorMaxConnections() {
+        return getInt(RangerPdpConstants.PROP_HTTP_CONNECTOR_MAX_CONNECTIONS, 
10000);
+    }
+
+    public String getAuthTypes() {
+        return get(RangerPdpConstants.PROP_AUTH_TYPES, "header,jwt,kerberos");
+    }

Review Comment:
   `getAuthTypes()` defaults to `header,jwt,kerberos`. Combined with the 
shipped defaults (empty JWT public key/JWKS URL and empty SPNEGO 
principal/keytab), this leads to `RangerPdpAuthFilter` failing to initialize 
and the server failing to start unless the user overrides these properties. 
Consider using a safer default here (e.g., require explicit config, or default 
to a single auth mechanism that can start with defaults) so the default 
distribution can boot predictably.



##########
pdp/src/main/java/org/apache/ranger/pdp/rest/RangerPdpREST.java:
##########
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.pdp.rest;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ranger.authz.api.RangerAuthorizer;
+import org.apache.ranger.authz.api.RangerAuthzException;
+import org.apache.ranger.authz.model.RangerAccessInfo;
+import org.apache.ranger.authz.model.RangerAuthzRequest;
+import org.apache.ranger.authz.model.RangerAuthzResult;
+import org.apache.ranger.authz.model.RangerMultiAuthzRequest;
+import org.apache.ranger.authz.model.RangerMultiAuthzResult;
+import org.apache.ranger.authz.model.RangerResourceInfo;
+import org.apache.ranger.authz.model.RangerResourcePermissions;
+import org.apache.ranger.authz.model.RangerResourcePermissionsRequest;
+import org.apache.ranger.authz.model.RangerUserInfo;
+import org.apache.ranger.pdp.RangerPdpStats;
+import org.apache.ranger.pdp.config.RangerPdpConfig;
+import org.apache.ranger.pdp.config.RangerPdpConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.ranger.pdp.config.RangerPdpConstants.PROP_PDP_SERVICE_PREFIX;
+import static 
org.apache.ranger.pdp.config.RangerPdpConstants.PROP_SUFFIX_DELEGATION_USERS;
+import static 
org.apache.ranger.pdp.config.RangerPdpConstants.WILDCARD_SERVICE_NAME;
+
+/**
+ * REST resource that exposes the three core {@link RangerAuthorizer} methods 
over HTTP.
+ *
+ * <p>All endpoints are under {@code /authz/v1} and produce/consume {@code 
application/json}.
+ * Authentication is enforced upstream by {@link RangerPdpAuthFilter}; the 
authenticated
+ * caller's identity is read from the {@link 
RangerPdpConstants#ATTR_AUTHENTICATED_USER}
+ * request attribute.
+ *
+ * <table border="1">
+ *   <tr><th>Method</th><th>Path</th><th>Request body</th><th>Response 
body</th></tr>
+ *   <tr><td>POST</td><td>/authz/v1/authorize</td>
+ *       <td>{@link RangerAuthzRequest}</td><td>{@link 
RangerAuthzResult}</td></tr>
+ *   <tr><td>POST</td><td>/authz/v1/authorizeMulti</td>
+ *       <td>{@link RangerMultiAuthzRequest}</td><td>{@link 
RangerMultiAuthzResult}</td></tr>
+ *   <tr><td>POST</td><td>/authz/v1/permissions</td>
+ *       <td>{@link RangerResourcePermissionsRequest}</td>
+ *       <td>{@link RangerResourcePermissions}</td></tr>
+ * </table>
+ */
+@Path("/v1")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+@Singleton
+public class RangerPdpREST {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RangerPdpREST.class);
+
+    private static final Response RESPONSE_OK = Response.ok().build();
+
+    private final Map<String, Set<String>> delegationUsersByService = new 
HashMap<>();
+
+    @Inject
+    private RangerAuthorizer authorizer;
+
+    @Inject
+    private RangerPdpConfig config;
+
+    @Context
+    private ServletContext servletContext;
+
+    @PostConstruct
+    public void initialize() {
+        initializeDelegationUsers();
+    }
+
+    /**
+     * Evaluates a single access request.
+     *
+     * @param request the authorization request
+     * @return {@code 200 OK} with {@link RangerAuthzResult}, or {@code 400} / 
{@code 500} on error
+     */
+    @POST
+    @Path("/authorize")
+    public Response authorize(RangerAuthzRequest request, @Context 
HttpServletRequest httpRequest) {
+        long     startNanos = System.nanoTime();
+        Response ret        = null;
+
+        try {
+            String           requestId   = request != null ? 
request.getRequestId() : null;
+            String           caller      = getAuthenticatedUser(httpRequest);
+            String           serviceName = getServiceName(request);
+            RangerUserInfo   user        = request != null ? request.getUser() 
: null;
+            RangerAccessInfo access      = request != null ? 
request.getAccess() : null;
+
+            LOG.debug("==> authorize(requestId={}, caller={}, 
serviceName={})", requestId, caller, serviceName);
+
+            ret = validateCaller(caller, user, access, serviceName);
+
+            if (RESPONSE_OK.equals(ret)) {
+                try {
+                    RangerAuthzResult result = authorizer.authorize(request);
+
+                    ret = Response.ok(result).build();
+                } catch (RangerAuthzException e) {
+                    LOG.warn("authorize(requestId={}): authorization error; 
caller={}", requestId, caller, e);
+
+                    ret = badRequest(e);
+                } catch (Exception e) {
+                    LOG.error("authorize(requestId={}): internal error; 
caller={}", requestId, caller, e);
+
+                    ret = serverError(e);
+                }
+            }
+
+            LOG.debug("<== authorize(requestId={}, caller={}, serviceName={}): 
ret={}", requestId, caller, serviceName, ret != null ? ret.getStatus() : null);
+        } finally {
+            recordRequestMetrics(ret, startNanos, httpRequest);
+        }
+
+        return ret;
+    }
+
+    /**
+     * Evaluates multiple access requests in a single call.
+     *
+     * @param request the multi-access authorization request
+     * @return {@code 200 OK} with {@link RangerMultiAuthzResult}, or {@code 
400} / {@code 500} on error
+     */
+    @POST
+    @Path("/authorizeMulti")
+    public Response authorizeMulti(RangerMultiAuthzRequest request, @Context 
HttpServletRequest httpRequest) {
+        long     startNanos = System.nanoTime();
+        Response ret        = null;
+
+        try {
+            String                 requestId   = request != null ? 
request.getRequestId() : null;
+            String                 caller      = 
getAuthenticatedUser(httpRequest);
+            String                 serviceName = getServiceName(request);
+            RangerUserInfo         user        = request != null ? 
request.getUser() : null;
+            List<RangerAccessInfo> accesses    = request != null ? 
request.getAccesses() : null;
+
+            LOG.debug("==> authorizeMulti(requestId={}, caller={}, 
serviceName={})", requestId, caller, serviceName);
+
+            ret = validateCaller(caller, user, accesses, serviceName);
+
+            if (RESPONSE_OK.equals(ret)) {
+                try {
+                    RangerMultiAuthzResult result = 
authorizer.authorize(request);
+
+                    ret = Response.ok(result).build();
+                } catch (RangerAuthzException e) {
+                    LOG.warn("authorizeMulti(requestId={}): authorization 
error; caller={}", requestId, caller, e);
+
+                    ret = badRequest(e);
+                } catch (Exception e) {
+                    LOG.error("authorizeMulti(requestId={}): internal error; 
caller={}", requestId, caller, e);
+
+                    ret = serverError(e);
+                }
+            }
+
+            LOG.debug("<== authorizeMulti(requestId={}, caller={}, 
serviceName={}): ret={}", requestId, caller, serviceName, ret != null ? 
ret.getStatus() : null);
+        } finally {
+            recordRequestMetrics(ret, startNanos, httpRequest);
+        }
+
+        return ret;
+    }
+
+    /**
+     * Returns the effective permissions for a resource, broken down by 
user/group/role.
+     *
+     * @param request wrapper containing the resource info and access context
+     * @return {@code 200 OK} with {@link RangerResourcePermissions}, or 
{@code 400} / {@code 500} on error
+     */
+    @POST
+    @Path("/permissions")
+    public Response getResourcePermissions(RangerResourcePermissionsRequest 
request, @Context HttpServletRequest httpRequest) {
+        long     startNanos = System.nanoTime();
+        Response ret        = null;
+
+        try {
+            String caller      = getAuthenticatedUser(httpRequest);
+            String serviceName = getServiceName(request);
+
+            LOG.debug("==> getResourcePermissions(caller={}, serviceName={})", 
caller, serviceName);
+
+            ret = validateCaller(caller, serviceName);
+
+            if (RESPONSE_OK.equals(ret)) {
+                try {
+                    RangerResourcePermissions result = 
authorizer.getResourcePermissions(request);
+
+                    ret = Response.ok(result).build();
+                } catch (RangerAuthzException e) {
+                    LOG.warn("getResourcePermissions(): validation error; 
caller={}", caller, e);
+
+                    ret = badRequest(e);
+                } catch (Exception e) {
+                    LOG.error("getResourcePermissions(): unexpected error; 
caller={}", caller, e);
+
+                    ret = serverError(e);
+                }
+            }
+
+            LOG.debug("==> getResourcePermissions(caller={}, serviceName={}): 
ret={}", caller, serviceName, ret != null ? ret.getStatus() : null);
+        } finally {
+            recordRequestMetrics(ret, startNanos, httpRequest);
+        }
+
+        return ret;
+    }
+
+    private String getAuthenticatedUser(HttpServletRequest httpRequest) {
+        Object user = 
httpRequest.getAttribute(RangerPdpConstants.ATTR_AUTHENTICATED_USER);
+
+        return user != null ? user.toString() : null;
+    }
+
+    private static String getServiceName(RangerAuthzRequest request) {
+        return request != null && request.getContext() != null ? 
request.getContext().getServiceName() : null;
+    }
+
+    private static String getServiceName(RangerMultiAuthzRequest request) {
+        return request != null && request.getContext() != null ? 
request.getContext().getServiceName() : null;
+    }
+
+    private static String getServiceName(RangerResourcePermissionsRequest 
request) {
+        return request != null && request.getContext() != null ? 
request.getContext().getServiceName() : null;
+    }
+
+    private Response validateCaller(String caller, RangerUserInfo user, 
RangerAccessInfo access, String serviceName) {
+        final Response ret;
+
+        if (StringUtils.isBlank(caller)) {
+            ret = Response.status(Response.Status.UNAUTHORIZED)
+                    .entity(new ErrorResponse(Response.Status.UNAUTHORIZED, 
"Authentication required"))
+                    .build();
+        } else {
+            boolean needsDelegation = isDelegationNeeded(caller, user) || 
isDelegationNeeded(access);
+
+            if (needsDelegation) {
+                if (!isDelegationUserForService(serviceName, caller)) {
+                    LOG.info("{} is not a delegation user in service {}", 
caller, serviceName);
+
+                    ret = Response.status(Response.Status.FORBIDDEN)
+                            .entity(new 
ErrorResponse(Response.Status.FORBIDDEN, caller + " is not authorized"))
+                            .build();
+                } else {
+                    ret = RESPONSE_OK;
+                }
+            } else {
+                ret = RESPONSE_OK;
+            }
+        }
+
+        return ret;
+    }
+
+    private Response validateCaller(String caller, RangerUserInfo user, 
List<RangerAccessInfo> accesses, String serviceName) {
+        final Response ret;
+
+        if (StringUtils.isBlank(caller)) {
+            ret = Response.status(Response.Status.UNAUTHORIZED)
+                    .entity(new ErrorResponse(Response.Status.UNAUTHORIZED, 
"Authentication required"))
+                    .build();
+        } else {
+            boolean needsDelegation = isDelegationNeeded(caller, user) || 
isDelegationNeeded(accesses);
+
+            if (needsDelegation) {
+                if (!isDelegationUserForService(serviceName, caller)) {
+                    LOG.info("{} is not a delegation user in service {}", 
caller, serviceName);
+
+                    ret = Response.status(Response.Status.FORBIDDEN)
+                            .entity(new 
ErrorResponse(Response.Status.FORBIDDEN, caller + " is not authorized"))
+                            .build();
+                } else {
+                    ret = RESPONSE_OK;
+                }
+            } else {
+                ret = RESPONSE_OK;
+            }
+        }
+
+        return ret;
+    }
+
+    private Response validateCaller(String caller, String serviceName) {
+        final Response ret;
+
+        if (StringUtils.isBlank(caller)) {
+            ret = Response.status(Response.Status.UNAUTHORIZED)
+                    .entity(new ErrorResponse(Response.Status.UNAUTHORIZED, 
"Authentication required"))
+                    .build();
+        } else if (!isDelegationUserForService(serviceName, caller)) {
+            LOG.info("{} is not a delegation user in service {}", caller, 
serviceName);
+
+            ret = Response.status(Response.Status.FORBIDDEN)
+                    .entity(new ErrorResponse(Response.Status.FORBIDDEN, 
caller + " is not authorized"))
+                    .build();
+        } else {
+            ret = RESPONSE_OK;
+        }
+
+        return ret;
+    }
+
+    private boolean isDelegationNeeded(String caller, RangerUserInfo user) {
+        String  userName        = user != null ? user.getName() : null;
+        boolean needsDelegation = !caller.equals(userName);
+
+        if (!needsDelegation) {
+            // don't trust user-attributes/groups/roles if caller doesn't have 
delegation permission
+            needsDelegation = MapUtils.isNotEmpty(user.getAttributes()) || 
CollectionUtils.isNotEmpty(user.getGroups()) || 
CollectionUtils.isNotEmpty(user.getRoles());
+        }
+
+        return needsDelegation;
+    }
+
+    private boolean isDelegationNeeded(RangerAccessInfo access) {
+        RangerResourceInfo resource = access != null ? access.getResource() : 
null;
+
+        // delegation permission is needed when resource attributes are 
specified
+        return (resource != null && 
MapUtils.isNotEmpty(resource.getAttributes()));
+    }
+
+    private boolean isDelegationNeeded(List<RangerAccessInfo> accesses) {
+        if (accesses != null) {
+            for (RangerAccessInfo access : accesses) {
+                if (isDelegationNeeded(access)) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+    private RangerPdpStats getRuntimeState(HttpServletRequest httpRequest) {
+        Object state = 
httpRequest.getServletContext().getAttribute(RangerPdpConstants.SERVLET_CTX_ATTR_RUNTIME_STATE);
+
+        if (state instanceof RangerPdpStats) {
+            return (RangerPdpStats) state;
+        } else {
+            Object fallback = servletContext != null ? 
servletContext.getAttribute(RangerPdpConstants.SERVLET_CTX_ATTR_RUNTIME_STATE) 
: null;
+
+            return (fallback instanceof RangerPdpStats) ? (RangerPdpStats) 
fallback : new RangerPdpStats();
+        }
+    }
+
+    private void recordRequestMetrics(Response ret, long startNanos, 
HttpServletRequest httpRequest) {
+        RangerPdpStats state   = getRuntimeState(httpRequest);
+        int            status  = ret != null ? ret.getStatus() : 500;
+        long           elapsed = System.nanoTime() - startNanos;
+
+        if (status >= 200 && status < 300) {
+            state.recordRequestSuccess(elapsed);
+        } else if (status == 401 || status == 403) { // UNAUTHORIZED or 
FORBIDDEN
+            state.recordAuthFailure();
+        } else if (status == 400) {
+            state.recordRequestBadRequest(elapsed);
+        } else {
+            state.recordRequestError(elapsed);
+        }
+    }
+
+    /**
+     * Allowed-users config format:
+     *  ranger.pdp.service.<serviceName>.allowed.users=user1,user2
+     *  ranger.pdp.service.*.allowed.users=user3,user4
+     *
+     * If no allowed-users entries are configured, this check is disabled
+     * (backward-compatible behavior).
+     */
+    private boolean isDelegationUserForService(String serviceName, String 
userName) {
+        boolean ret;
+        Map<String, Set<String>> delegationUsersByService = 
this.delegationUsersByService;
+
+        if (delegationUsersByService.isEmpty()) {
+            ret = false;
+        } else if (StringUtils.isBlank(serviceName) || 
StringUtils.isBlank(userName)) {
+            ret = false;
+        } else {

Review Comment:
   `isDelegationUserForService()` returns `false` when 
`delegationUsersByService` is empty, but the preceding comment says the check 
should be disabled when no entries are configured. This causes 
`validateCaller(..., serviceName)` (used by `/permissions`) to always return 
403 when no config is present, and also blocks delegation in other endpoints 
contrary to the documented behavior. Consider treating an empty map as "no 
restriction" (return `true`) or update the comment + validation logic to match 
the intended default behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to