This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ebb794632ac Allow users with STATE permissions to read and write the
state APIs for querying with deep storage (#14944)
ebb794632ac is described below
commit ebb794632ac57612dcf6380783e6d4e64ec1b56d
Author: Laksh Singla <[email protected]>
AuthorDate: Thu Sep 21 06:55:07 2023 +0530
Allow users with STATE permissions to read and write the state APIs for
querying with deep storage (#14944)
Currently, only the user who has submitted the async query has permission
to interact with the status APIs for that async query. However, often we want
an administrator to interact with these resources as well.
Druid handles these with the STATE resource traditionally, and if the
requesting user has necessary permissions on it as well, alternatively, they
should be allowed to interact with the status APIs, irrespective of whether
they are the submitter of the query.
---
.../msq/sql/resources/SqlStatementResource.java | 87 +++++++---
.../resources/SqlMSQStatementResourcePostTest.java | 6 +-
.../sql/resources/SqlStatementResourceTest.java | 187 ++++++++++++++++++++-
.../org/apache/druid/msq/test/MSQTestBase.java | 4 +
.../druid/sql/calcite/util/CalciteTests.java | 6 +-
5 files changed, 257 insertions(+), 33 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
index 1a1acaa008e..97be6dbc3bb 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java
@@ -74,9 +74,14 @@ import org.apache.druid.query.QueryException;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.QueryResponse;
+import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.DirectStatement;
import org.apache.druid.sql.HttpStatement;
import org.apache.druid.sql.SqlRowTransformer;
@@ -103,6 +108,7 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -120,6 +126,7 @@ public class SqlStatementResource
private final ObjectMapper jsonMapper;
private final OverlordClient overlordClient;
private final StorageConnector storageConnector;
+ private final AuthorizerMapper authorizerMapper;
@Inject
@@ -127,13 +134,15 @@ public class SqlStatementResource
final @MSQ SqlStatementFactory msqSqlStatementFactory,
final ObjectMapper jsonMapper,
final OverlordClient overlordClient,
- final @MultiStageQuery StorageConnector storageConnector
+ final @MultiStageQuery StorageConnector storageConnector,
+ final AuthorizerMapper authorizerMapper
)
{
this.msqSqlStatementFactory = msqSqlStatementFactory;
this.jsonMapper = jsonMapper;
this.overlordClient = overlordClient;
this.storageConnector = storageConnector;
+ this.authorizerMapper = authorizerMapper;
}
/**
@@ -178,7 +187,7 @@ public class SqlStatementResource
final boolean isTaskStruct =
MSQTaskSqlEngine.TASK_STRUCT_FIELD_NAMES.equals(rowTransformer.getFieldList());
if (isTaskStruct) {
- return buildTaskResponse(sequence,
stmt.query().authResult().getIdentity());
+ return buildTaskResponse(sequence, stmt.query().authResult());
} else {
// Used for EXPLAIN
return buildStandardResponse(sequence, modifiedQuery, sqlQueryId,
rowTransformer);
@@ -231,8 +240,9 @@ public class SqlStatementResource
Optional<SqlStatementResult> sqlStatementResult = getStatementStatus(
queryId,
- authenticationResult.getIdentity(),
- true
+ authenticationResult,
+ true,
+ Action.READ
);
if (sqlStatementResult.isPresent()) {
@@ -288,7 +298,11 @@ public class SqlStatementResource
throw queryNotFoundException(queryId);
}
- MSQControllerTask msqControllerTask =
getMSQControllerTaskOrThrow(queryId, authenticationResult.getIdentity());
+ MSQControllerTask msqControllerTask =
getMSQControllerTaskAndCheckPermission(
+ queryId,
+ authenticationResult,
+ Action.READ
+ );
throwIfQueryIsNotSuccessful(queryId, statusPlus);
Optional<List<ColumnNameAndTypes>> signature =
SqlStatementResourceHelper.getSignature(msqControllerTask);
@@ -353,8 +367,9 @@ public class SqlStatementResource
Optional<SqlStatementResult> sqlStatementResult = getStatementStatus(
queryId,
- authenticationResult.getIdentity(),
- false
+ authenticationResult,
+ false,
+ Action.WRITE
);
if (sqlStatementResult.isPresent()) {
switch (sqlStatementResult.get().getState()) {
@@ -448,7 +463,7 @@ public class SqlStatementResource
}
}
- private Response buildTaskResponse(Sequence<Object[]> sequence, String user)
+ private Response buildTaskResponse(Sequence<Object[]> sequence,
AuthenticationResult authenticationResult)
{
List<Object[]> rows = sequence.toList();
int numRows = rows.size();
@@ -464,7 +479,7 @@ public class SqlStatementResource
}
String taskId = String.valueOf(firstRow[0]);
- Optional<SqlStatementResult> statementResult = getStatementStatus(taskId,
user, true);
+ Optional<SqlStatementResult> statementResult = getStatementStatus(taskId,
authenticationResult, true, Action.READ);
if (statementResult.isPresent()) {
return
Response.status(Response.Status.OK).entity(statementResult.get()).build();
@@ -565,8 +580,12 @@ public class SqlStatementResource
}
- private Optional<SqlStatementResult> getStatementStatus(String queryId,
String currentUser, boolean withResults)
- throws DruidException
+ private Optional<SqlStatementResult> getStatementStatus(
+ String queryId,
+ AuthenticationResult authenticationResult,
+ boolean withResults,
+ Action forAction
+ ) throws DruidException
{
TaskStatusResponse taskResponse =
contactOverlord(overlordClient.taskStatus(queryId), queryId);
if (taskResponse == null) {
@@ -579,7 +598,7 @@ public class SqlStatementResource
}
// since we need the controller payload for auth checks.
- MSQControllerTask msqControllerTask = getMSQControllerTaskOrThrow(queryId,
currentUser);
+ MSQControllerTask msqControllerTask =
getMSQControllerTaskAndCheckPermission(queryId, authenticationResult,
forAction);
SqlStatementState sqlStatementState =
SqlStatementResourceHelper.getSqlStatementState(statusPlus);
if (SqlStatementState.FAILED == sqlStatementState) {
@@ -610,7 +629,20 @@ public class SqlStatementResource
}
- private MSQControllerTask getMSQControllerTaskOrThrow(String queryId, String
currentUser)
+ /**
+ * This method contacts the overlord for the controller task and checks if
the requested user has the
+ * necessary permissions. A user has the necessary permissions if one of the
following criteria is satisfied:
+ * 1. The user is the one who submitted the query
+ * 2. The user belongs to a role containing the READ or WRITE permissions
over the STATE resource. For endpoints like GET,
+ * the user should have READ permission for the STATE resource, while for
endpoints like DELETE, the user should
+ * have WRITE permission for the STATE resource. (Note: POST API does not
need to check the state permissions since
+ * the currentUser always equal to the queryUser)
+ */
+ private MSQControllerTask getMSQControllerTaskAndCheckPermission(
+ String queryId,
+ AuthenticationResult authenticationResult,
+ Action forAction
+ ) throws ForbiddenException
{
TaskPayloadResponse taskPayloadResponse =
contactOverlord(overlordClient.taskPayload(queryId), queryId);
SqlStatementResourceHelper.isMSQPayload(taskPayloadResponse, queryId);
@@ -620,15 +652,28 @@ public class SqlStatementResource
.getQuery()
.getContext()
.get(MSQTaskQueryMaker.USER_KEY));
- if (currentUser == null || !currentUser.equals(queryUser)) {
- throw new ForbiddenException(StringUtils.format(
- "The current user[%s] cannot view query id[%s] since the query is
owned by user[%s]",
- currentUser,
- queryId,
- queryUser
- ));
+
+ String currentUser = authenticationResult.getIdentity();
+
+ if (currentUser != null && currentUser.equals(queryUser)) {
+ return msqControllerTask;
+ }
+
+ Access access = AuthorizationUtils.authorizeAllResourceActions(
+ authenticationResult,
+ Collections.singletonList(new ResourceAction(Resource.STATE_RESOURCE,
forAction)),
+ authorizerMapper
+ );
+
+ if (access.isAllowed()) {
+ return msqControllerTask;
}
- return msqControllerTask;
+
+ throw new ForbiddenException(StringUtils.format(
+ "The current user[%s] cannot view query id[%s] since the query is
owned by another user",
+ currentUser,
+ queryId
+ ));
}
/**
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
index 0a20dd12066..415e36a02d4 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java
@@ -74,7 +74,8 @@ public class SqlMSQStatementResourcePostTest extends
MSQTestBase
sqlStatementFactory,
objectMapper,
indexingServiceClient,
- localFileStorageConnector
+ localFileStorageConnector,
+ authorizerMapper
);
}
@@ -274,7 +275,8 @@ public class SqlMSQStatementResourcePostTest extends
MSQTestBase
sqlStatementFactory,
objectMapper,
indexingServiceClient,
- NilStorageConnector.getInstance()
+ NilStorageConnector.getInstance(),
+ authorizerMapper
);
String errorMessage = "The sql statement api cannot read from the select
destination [durableStorage] provided in "
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
index ec986a93159..d6572801207 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
@@ -75,8 +75,13 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.mocks.MockHttpServletRequest;
+import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.server.security.Authorizer;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.http.ResultFormat;
@@ -321,8 +326,6 @@ public class SqlStatementResourceTest extends MSQTestBase
)
);
private static final DateTime QUEUE_INSERTION_TIME =
DateTimes.of("2023-05-31T12:01Z");
- private static final Map<String, Object> ROW1 = ImmutableMap.of("_time",
123, "alias", "foo", "market", "bar");
- private static final Map<String, Object> ROW2 = ImmutableMap.of("_time",
234, "alias", "foo1", "market", "bar1");
public static final ImmutableList<ColumnNameAndTypes> COL_NAME_AND_TYPES =
ImmutableList.of(
new ColumnNameAndTypes(
"_time",
@@ -343,6 +346,47 @@ public class SqlStatementResourceTest extends MSQTestBase
private static final String FAILURE_MSG = "failure msg";
private static SqlStatementResource resource;
+ private static String SUPERUSER = "superuser";
+ private static String STATE_R_USER = "stateR";
+ private static String STATE_W_USER = "stateW";
+ private static String STATE_RW_USER = "stateRW";
+
+ private AuthorizerMapper authorizerMapper = new AuthorizerMapper(null)
+ {
+ @Override
+ public Authorizer getAuthorizer(String name)
+ {
+ return (authenticationResult, resource, action) -> {
+ if (SUPERUSER.equals(authenticationResult.getIdentity())) {
+ return Access.OK;
+ }
+
+ switch (resource.getType()) {
+ case ResourceType.DATASOURCE:
+ case ResourceType.VIEW:
+ case ResourceType.QUERY_CONTEXT:
+ case ResourceType.EXTERNAL:
+ return Access.OK;
+ case ResourceType.STATE:
+ String identity = authenticationResult.getIdentity();
+ if (action == Action.READ) {
+ if (STATE_R_USER.equals(identity) ||
STATE_RW_USER.equals(identity)) {
+ return Access.OK;
+ }
+ } else if (action == Action.WRITE) {
+ if (STATE_W_USER.equals(identity) ||
STATE_RW_USER.equals(identity)) {
+ return Access.OK;
+ }
+ }
+ return Access.DENIED;
+
+ default:
+ return Access.DENIED;
+ }
+ };
+ }
+ };
+
@Mock
private OverlordClient overlordClient;
@@ -635,7 +679,7 @@ public class SqlStatementResourceTest extends MSQTestBase
return makeExpectedReq(CalciteTests.REGULAR_USER_AUTH_RESULT);
}
- public static MockHttpServletRequest makeExpectedReq(AuthenticationResult
authenticationResult)
+ private static MockHttpServletRequest makeExpectedReq(AuthenticationResult
authenticationResult)
{
MockHttpServletRequest req = new MockHttpServletRequest();
req.attributes.put(AuthConfig.DRUID_AUTHENTICATION_RESULT,
authenticationResult);
@@ -643,6 +687,16 @@ public class SqlStatementResourceTest extends MSQTestBase
return req;
}
+ private static AuthenticationResult makeAuthResultForUser(String user)
+ {
+ return new AuthenticationResult(
+ user,
+ AuthConfig.ALLOW_ALL_NAME,
+ null,
+ null
+ );
+ }
+
@Before
public void init() throws Exception
{
@@ -652,7 +706,8 @@ public class SqlStatementResourceTest extends MSQTestBase
sqlStatementFactory,
objectMapper,
overlordClient,
- new LocalFileStorageConnector(tmpFolder.newFolder("local"))
+ new LocalFileStorageConnector(tmpFolder.newFolder("local")),
+ authorizerMapper
);
}
@@ -918,29 +973,145 @@ public class SqlStatementResourceTest extends MSQTestBase
}
@Test
- public void testForbiddenRequest()
+ public void testAPIBehaviourWithSuperUsers()
{
+ Assert.assertEquals(
+ Response.Status.OK.getStatusCode(),
+ resource.doGetStatus(
+ RUNNING_SELECT_MSQ_QUERY,
+ makeExpectedReq(makeAuthResultForUser(SUPERUSER))
+ ).getStatus()
+ );
+ Assert.assertEquals(
+ Response.Status.BAD_REQUEST.getStatusCode(),
+ resource.doGetResults(
+ RUNNING_SELECT_MSQ_QUERY,
+ 1L,
+ null,
+ makeExpectedReq(makeAuthResultForUser(SUPERUSER))
+ ).getStatus()
+ );
+ Assert.assertEquals(
+ Response.Status.ACCEPTED.getStatusCode(),
+ resource.deleteQuery(
+ RUNNING_SELECT_MSQ_QUERY,
+ makeExpectedReq(makeAuthResultForUser(SUPERUSER))
+ ).getStatus()
+ );
+ }
+
+ @Test
+ public void testAPIBehaviourWithDifferentUserAndNoStatePermission()
+ {
+ AuthenticationResult differentUserAuthResult =
makeAuthResultForUser("differentUser");
Assert.assertEquals(
Response.Status.FORBIDDEN.getStatusCode(),
resource.doGetStatus(
RUNNING_SELECT_MSQ_QUERY,
- makeExpectedReq(CalciteTests.SUPER_USER_AUTH_RESULT)
+ makeExpectedReq(differentUserAuthResult)
+ ).getStatus()
+ );
+ Assert.assertEquals(
+ Response.Status.FORBIDDEN.getStatusCode(),
+ resource.doGetResults(
+ RUNNING_SELECT_MSQ_QUERY,
+ 1L,
+ null,
+ makeExpectedReq(differentUserAuthResult)
).getStatus()
);
Assert.assertEquals(
Response.Status.FORBIDDEN.getStatusCode(),
+ resource.deleteQuery(
+ RUNNING_SELECT_MSQ_QUERY,
+ makeExpectedReq(differentUserAuthResult)
+ ).getStatus()
+ );
+ }
+
+ @Test
+ public void testAPIBehaviourWithDifferentUserAndStateRPermission()
+ {
+ AuthenticationResult differentUserAuthResult =
makeAuthResultForUser(STATE_R_USER);
+ Assert.assertEquals(
+ Response.Status.OK.getStatusCode(),
+ resource.doGetStatus(
+ RUNNING_SELECT_MSQ_QUERY,
+ makeExpectedReq(differentUserAuthResult)
+ ).getStatus()
+ );
+ Assert.assertEquals(
+ Response.Status.BAD_REQUEST.getStatusCode(),
resource.doGetResults(
RUNNING_SELECT_MSQ_QUERY,
1L,
null,
- makeExpectedReq(CalciteTests.SUPER_USER_AUTH_RESULT)
+ makeExpectedReq(differentUserAuthResult)
).getStatus()
);
Assert.assertEquals(
Response.Status.FORBIDDEN.getStatusCode(),
resource.deleteQuery(
RUNNING_SELECT_MSQ_QUERY,
- makeExpectedReq(CalciteTests.SUPER_USER_AUTH_RESULT)
+ makeExpectedReq(differentUserAuthResult)
+ ).getStatus()
+ );
+ }
+
+ @Test
+ public void testAPIBehaviourWithDifferentUserAndStateWPermission()
+ {
+ AuthenticationResult differentUserAuthResult =
makeAuthResultForUser(STATE_W_USER);
+ Assert.assertEquals(
+ Response.Status.FORBIDDEN.getStatusCode(),
+ resource.doGetStatus(
+ RUNNING_SELECT_MSQ_QUERY,
+ makeExpectedReq(differentUserAuthResult)
+ ).getStatus()
+ );
+ Assert.assertEquals(
+ Response.Status.FORBIDDEN.getStatusCode(),
+ resource.doGetResults(
+ RUNNING_SELECT_MSQ_QUERY,
+ 1L,
+ null,
+ makeExpectedReq(differentUserAuthResult)
+ ).getStatus()
+ );
+ Assert.assertEquals(
+ Response.Status.ACCEPTED.getStatusCode(),
+ resource.deleteQuery(
+ RUNNING_SELECT_MSQ_QUERY,
+ makeExpectedReq(differentUserAuthResult)
+ ).getStatus()
+ );
+ }
+
+ @Test
+ public void testAPIBehaviourWithDifferentUserAndStateRWPermission()
+ {
+ AuthenticationResult differentUserAuthResult =
makeAuthResultForUser(STATE_RW_USER);
+ Assert.assertEquals(
+ Response.Status.OK.getStatusCode(),
+ resource.doGetStatus(
+ RUNNING_SELECT_MSQ_QUERY,
+ makeExpectedReq(differentUserAuthResult)
+ ).getStatus()
+ );
+ Assert.assertEquals(
+ Response.Status.BAD_REQUEST.getStatusCode(),
+ resource.doGetResults(
+ RUNNING_SELECT_MSQ_QUERY,
+ 1L,
+ null,
+ makeExpectedReq(differentUserAuthResult)
+ ).getStatus()
+ );
+ Assert.assertEquals(
+ Response.Status.ACCEPTED.getStatusCode(),
+ resource.deleteQuery(
+ RUNNING_SELECT_MSQ_QUERY,
+ makeExpectedReq(differentUserAuthResult)
).getStatus()
);
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 9ebcb2ec53e..6b8ea3e0ae0 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -146,6 +146,7 @@ import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.sql.DirectStatement;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
@@ -288,6 +289,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected MSQTestOverlordServiceClient indexingServiceClient;
protected MSQTestTaskActionClient testTaskActionClient;
protected SqlStatementFactory sqlStatementFactory;
+ protected AuthorizerMapper authorizerMapper;
private IndexIO indexIO;
private MSQTestSegmentManager segmentManager;
@@ -526,6 +528,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
);
sqlStatementFactory = CalciteTests.createSqlStatementFactory(engine,
plannerFactory);
+
+ authorizerMapper = CalciteTests.TEST_EXTERNAL_AUTHORIZER_MAPPER;
}
protected CatalogResolver createMockCatalogResolver()
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index bab3e0957e8..df6dd781ca0 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -209,13 +209,15 @@ public class CalciteTests
public static final AuthenticationResult REGULAR_USER_AUTH_RESULT = new
AuthenticationResult(
AuthConfig.ALLOW_ALL_NAME,
AuthConfig.ALLOW_ALL_NAME,
- null, null
+ null,
+ null
);
public static final AuthenticationResult SUPER_USER_AUTH_RESULT = new
AuthenticationResult(
TEST_SUPERUSER_NAME,
AuthConfig.ALLOW_ALL_NAME,
- null, null
+ null,
+ null
);
public static final Injector INJECTOR = new CalciteTestInjectorBuilder()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]