This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 03b0dae77bd Fix resourceName used in auth check for SupervisorsTable
in SystemSchema (#18985)
03b0dae77bd is described below
commit 03b0dae77bd1953f8da8bfc44adaafa2e2d85897
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Thu Feb 19 22:34:36 2026 -0800
Fix resourceName used in auth check for SupervisorsTable in SystemSchema
(#18985)
* Fix resourceName used in auth check for SupervisorsTable in SystemSchema
---
.../overlord/supervisor/SupervisorSpec.java | 10 +-
.../druid/sql/calcite/schema/SystemSchema.java | 2 +-
.../druid/sql/calcite/schema/SystemSchemaTest.java | 103 ++++++++++++++++++++-
3 files changed, 108 insertions(+), 7 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index 377223308b0..fe9dd8f3942 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -38,7 +38,7 @@ import java.util.Set;
public interface SupervisorSpec
{
/**
- * Return an unique id of {@link Supervisor}.
+ * Return a unique id of {@link Supervisor}.
*/
String getId();
@@ -93,10 +93,12 @@ public interface SupervisorSpec
}
/**
- * This API is only used for informational purposes in
- * org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable
+ * Return source (like stream or topic name) for the supervisor
+ * This API is currently used for spec validation in {@link
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec}
+ * and for returning the spec with source in {@link
org.apache.druid.indexing.overlord.supervisor.SupervisorResource}
+ * when the spec is requested with system flag.
*
- * @return source like stream or topic name
+ * @return source
*/
String getSource();
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index f461acfa687..58e1f0cd1b3 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -1097,7 +1097,7 @@ public class SystemSchema extends AbstractSchema
);
Function<SupervisorStatus, Iterable<ResourceAction>> raGenerator =
supervisor -> Collections.singletonList(
-
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(supervisor.getSource()));
+
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(supervisor.getDataSource()));
final Iterable<SupervisorStatus> authorizedSupervisors =
AuthorizationUtils.filterAuthorizedResources(
authenticationResult,
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index ae15e124198..fab2a0ca438 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -151,6 +151,8 @@ public class SystemSchemaTest extends CalciteTestBase
{
private static final ObjectMapper MAPPER = CalciteTests.getJsonMapper();
+ private static final String DATASOURCE_ALL_ACCESS = "allAccess";
+
private static final BrokerSegmentMetadataCacheConfig
SEGMENT_CACHE_CONFIG_DEFAULT = BrokerSegmentMetadataCacheConfig.create();
private static final List<InputRow> ROWS1 = ImmutableList.of(
@@ -1490,8 +1492,9 @@ public class SystemSchemaTest extends CalciteTestBase
SystemSchema.SupervisorsTable supervisorTable =
new SystemSchema.SupervisorsTable(overlordClient, createAuthMapper());
- final String json = "[{\n"
- + "\t\"id\": \"wikipedia\",\n"
+ String json = "[{\n"
+ + "\t\"id\": \"wikipedia_supervisor\",\n"
+ + "\t\"dataSource\": \"wikipedia\",\n"
+ "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n"
+ "\t\"detailedState\": \"UNABLE_TO_CONNECT_TO_STREAM\",\n"
+ "\t\"healthy\": false,\n"
@@ -1679,6 +1682,100 @@ public class SystemSchemaTest extends CalciteTestBase
EasyMock.verify(mockEngine);
}
+ @Test
+ public void testSupervisorTableAuthOnDataSourceName() throws
JsonProcessingException
+ {
+ SystemSchema.SupervisorsTable supervisorTable =
+ new SystemSchema.SupervisorsTable(overlordClient, createAuthMapper());
+
+ // Verify that 1 row is returned for datasource name DATASOURCE_ALL_ACCESS
+ String datasourceAllAccessSupervisor =
+ "[{\n"
+ + "\t\"id\": \"wikipedia_supervisor\",\n"
+ + "\t\"dataSource\": \"" + DATASOURCE_ALL_ACCESS + "\",\n"
+ + "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n"
+ + "\t\"detailedState\": \"UNABLE_TO_CONNECT_TO_STREAM\",\n"
+ + "\t\"healthy\": false,\n"
+ + "\t\"specString\":
\"{\\\"type\\\":\\\"kafka\\\",\\\"dataSchema\\\":{\\\"dataSource\\\":\\\"wikipedia\\\"}"
+ + ",\\\"context\\\":null,\\\"suspended\\\":false}\",\n"
+ + "\t\"type\": \"kafka\",\n"
+ + "\t\"source\": \"wikipedia\",\n"
+ + "\t\"suspended\": false\n"
+ + "}]";
+ EasyMock.expect(overlordClient.supervisorStatuses()).andAnswer(
+ () -> Futures.immediateFuture(
+ CloseableIterators.withEmptyBaggage(
+ MAPPER.readValue(datasourceAllAccessSupervisor, new
TypeReference<List<SupervisorStatus>>() {}).iterator()
+ )
+ )
+ ).times(1);
+ EasyMock.replay(overlordClient);
+ List<Object[]> rows = supervisorTable
+ .scan(createDataContext(Users.ONLY_DATASOURCE_ALL_ACCESS))
+ .toList();
+ Assert.assertEquals(1, rows.size());
+ EasyMock.verify(overlordClient);
+ EasyMock.reset(overlordClient);
+
+ // Verify that no row is returned for datasource name not matching
DATASOURCE_ALL_ACCESS
+ String datasourceNotAllAccess =
+ "[{\n"
+ + "\t\"id\": \"wikipedia_supervisor\",\n"
+ + "\t\"dataSource\": \"wikipedia\",\n"
+ + "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n"
+ + "\t\"detailedState\": \"UNABLE_TO_CONNECT_TO_STREAM\",\n"
+ + "\t\"healthy\": false,\n"
+ + "\t\"specString\":
\"{\\\"type\\\":\\\"kafka\\\",\\\"dataSchema\\\":{\\\"dataSource\\\":\\\"wikipedia\\\"}"
+ + ",\\\"context\\\":null,\\\"suspended\\\":false}\",\n"
+ + "\t\"type\": \"kafka\",\n"
+ + "\t\"source\": \"wikipedia\",\n"
+ + "\t\"suspended\": false\n"
+ + "}]";
+ EasyMock.expect(overlordClient.supervisorStatuses()).andAnswer(
+ () -> Futures.immediateFuture(
+ CloseableIterators.withEmptyBaggage(
+ MAPPER.readValue(datasourceNotAllAccess, new
TypeReference<List<SupervisorStatus>>() {}).iterator()
+ )
+ )
+ ).times(1);
+ EasyMock.replay(overlordClient);
+ rows = supervisorTable
+ .scan(createDataContext(Users.ONLY_DATASOURCE_ALL_ACCESS))
+ .toList();
+ Assert.assertTrue(rows.isEmpty());
+ EasyMock.verify(overlordClient);
+ EasyMock.reset(overlordClient);
+
+ // Verify that no row is returned for datasource source and id matching
DATASOURCE_ALL_ACCESS
+ String datasourceSourceAndIdAllAccess =
+ "[{\n"
+ + "\t\"id\": \"" + DATASOURCE_ALL_ACCESS + "\",\n"
+ + "\t\"dataSource\": \"wikipedia\",\n"
+ + "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n"
+ + "\t\"detailedState\": \"UNABLE_TO_CONNECT_TO_STREAM\",\n"
+ + "\t\"healthy\": false,\n"
+ + "\t\"specString\":
\"{\\\"type\\\":\\\"kafka\\\",\\\"dataSchema\\\":{\\\"dataSource\\\":\\\"wikipedia\\\"}"
+ + ",\\\"context\\\":null,\\\"suspended\\\":false}\",\n"
+ + "\t\"type\": \"kafka\",\n"
+ + "\t\"source\": \"" + DATASOURCE_ALL_ACCESS + "\",\n"
+ + "\t\"suspended\": false\n"
+ + "}]";
+ EasyMock.expect(overlordClient.supervisorStatuses()).andAnswer(
+ () -> Futures.immediateFuture(
+ CloseableIterators.withEmptyBaggage(
+ MAPPER.readValue(datasourceSourceAndIdAllAccess, new
TypeReference<List<SupervisorStatus>>() {}).iterator()
+ )
+ )
+ ).times(1);
+ EasyMock.replay(overlordClient);
+ rows = supervisorTable
+ .scan(createDataContext(Users.ONLY_DATASOURCE_ALL_ACCESS))
+ .toList();
+ Assert.assertTrue(rows.isEmpty());
+ EasyMock.verify(overlordClient);
+ EasyMock.reset(overlordClient);
+ }
+
/**
* Creates a test QueryInfo implementation for testing purposes.
*/
@@ -1781,6 +1878,7 @@ public class SystemSchemaTest extends CalciteTestBase
username.equals(Users.SUPER)
|| (action == Action.READ &&
username.equals(Users.DATASOURCE_READ))
|| (action == Action.WRITE &&
username.equals(Users.DATASOURCE_WRITE))
+ || (resource.getName().equals(DATASOURCE_ALL_ACCESS))
);
}
@@ -1865,5 +1963,6 @@ public class SystemSchemaTest extends CalciteTestBase
private static final String SUPER = CalciteTests.TEST_SUPERUSER_NAME;
private static final String DATASOURCE_READ = "datasourceRead";
private static final String DATASOURCE_WRITE = "datasourceWrite";
+ private static final String ONLY_DATASOURCE_ALL_ACCESS =
"onlyDatasourceAllAccess";
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]