This is an automated email from the ASF dual-hosted git repository.
abhishekrb19 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d602e3acf3e fix: Make `sys.server_properties` table filterable and
resilient to unreachable servers (#19459)
d602e3acf3e is described below
commit d602e3acf3e87f7118a9657e75e560667a618eae
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Thu May 14 09:15:47 2026 -0700
fix: Make `sys.server_properties` table filterable and resilient to
unreachable servers (#19459)
1. Made sys.server_properties filterable, similar to a few other sys tables
like SegmentsTable — changed from ScannableTable to ProjectableFilterableTable,
enabling Calcite to push down filters (e.g., WHERE server = '...') and
projections to the table scan.
2. Graceful error handling — Previously, if any server was unreachable or
returned an HTTP error, the entire query threw an exception and failed. Now the
table returns a row for that server with property/value as null and the new
error_message column populated with what went wrong.
3. New error_message column — this column indicates if properties couldn't
be fetched (e.g., "Connection refused", "HTTP 503: Service Unavailable"). Null
on success.
---
docs/querying/sql-metadata-tables.md | 3 +-
.../schema/SystemServerPropertiesTableTest.java | 123 +++++-
.../schema/SystemServerPropertiesTable.java | 223 ++++++++--
.../druid/sql/calcite/schema/SystemSchemaTest.java | 455 ++++++++++++++++++++-
website/.spelling | 1 +
5 files changed, 758 insertions(+), 47 deletions(-)
diff --git a/docs/querying/sql-metadata-tables.md
b/docs/querying/sql-metadata-tables.md
index ebf5da39eba..53184a034a9 100644
--- a/docs/querying/sql-metadata-tables.md
+++ b/docs/querying/sql-metadata-tables.md
@@ -322,7 +322,7 @@ SELECT * FROM sys.supervisors WHERE healthy=0;
### SERVER_PROPERTIES table
-The `server_properties` table exposes the runtime properties configured on for
each Druid server. Each row represents a single property key-value pair
associated with a specific server.
+The `server_properties` table exposes the runtime properties configured on
each Druid server. Each row represents a single property key-value pair
associated with a specific server. This table supports filter and projection
pushdown for efficient querying. If a server is unreachable, the table still
returns a row for that server with the `error_message` column populated instead
of failing the entire query.
|Column|Type|Notes|
|------|-----|-----|
@@ -331,6 +331,7 @@ The `server_properties` table exposes the runtime
properties configured on for e
|node_roles|VARCHAR|Comma-separated list of roles that the server performs.
For example, `[coordinator,overlord]` if the server functions as both a
Coordinator and an Overlord.|
|property|VARCHAR|Name of the property|
|value|VARCHAR|Value of the property|
+|error_message|VARCHAR|Describes why properties could not be retrieved from
the server (e.g., connection refused, HTTP error). Null when properties were
fetched successfully.|
For example, to retrieve properties for a specific server, use the query
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java
index d5f062e0d47..4a4fee4eb1a 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/SystemServerPropertiesTableTest.java
@@ -119,14 +119,14 @@ public class SystemServerPropertiesTableTest extends
EmbeddedClusterTestBase
);
Assertions.assertEquals(
- StringUtils.format("localhost:%s,%s,[%s],test.onlyBroker,brokerValue",
BROKER_PORT, BROKER_SERVICE, NodeRole.BROKER_JSON_NAME),
+
StringUtils.format("localhost:%s,%s,[%s],test.onlyBroker,brokerValue,",
BROKER_PORT, BROKER_SERVICE, NodeRole.BROKER_JSON_NAME),
cluster.runSql("SELECT * FROM sys.server_properties WHERE server =
'localhost:%s' AND property = 'test.onlyBroker'", BROKER_PORT)
);
String[] expectedRows = new String[] {
-
StringUtils.format("localhost:%s,%s,[%s],test.nonUniqueProperty,brokerNonUniqueValue",
BROKER_PORT, BROKER_SERVICE, NodeRole.BROKER_JSON_NAME),
-
StringUtils.format("localhost:%s,%s,[%s],test.nonUniqueProperty,overlordNonUniqueValue",
OVERLORD_PORT, OVERLORD_SERVICE, NodeRole.OVERLORD_JSON_NAME),
-
StringUtils.format("localhost:%s,%s,[%s],test.nonUniqueProperty,coordinatorNonUniqueValue",
COORDINATOR_PORT, COORDINATOR_SERVICE, NodeRole.COORDINATOR_JSON_NAME),
+
StringUtils.format("localhost:%s,%s,[%s],test.nonUniqueProperty,brokerNonUniqueValue,",
BROKER_PORT, BROKER_SERVICE, NodeRole.BROKER_JSON_NAME),
+
StringUtils.format("localhost:%s,%s,[%s],test.nonUniqueProperty,overlordNonUniqueValue,",
OVERLORD_PORT, OVERLORD_SERVICE, NodeRole.OVERLORD_JSON_NAME),
+
StringUtils.format("localhost:%s,%s,[%s],test.nonUniqueProperty,coordinatorNonUniqueValue,",
COORDINATOR_PORT, COORDINATOR_SERVICE, NodeRole.COORDINATOR_JSON_NAME),
};
Arrays.sort(expectedRows, String::compareTo);
final String result = cluster.runSql("SELECT * FROM sys.server_properties
WHERE property='test.nonUniqueProperty'");
@@ -146,6 +146,118 @@ public class SystemServerPropertiesTableTest extends
EmbeddedClusterTestBase
Assertions.assertFalse(brokerProps.containsKey("password"));
}
+ @Test
+ public void test_serverPropertiesTable_serverFilterPushdown()
+ {
+ final String brokerHost = StringUtils.format("localhost:%s", BROKER_PORT);
+
+ // Equality filter returns only matching server rows
+ final String result = cluster.runSql(
+ "SELECT server, service_name, property FROM sys.server_properties
WHERE server = '%s'",
+ brokerHost
+ );
+ Assertions.assertFalse(result.isEmpty(), "Should return properties for the
broker");
+ for (String row : result.split("\n")) {
+ Assertions.assertTrue(
+ row.startsWith(brokerHost + ","),
+ "Row should belong to filtered server: " + row
+ );
+ }
+
+ // Non-existent server returns no rows
+ final String emptyResult = cluster.runSql(
+ "SELECT * FROM sys.server_properties WHERE server = 'nonexistent:9999'"
+ );
+ Assertions.assertTrue(emptyResult.isEmpty(), "Non-existent server filter
should return no rows");
+
+ // != is not consumed — falls back to Calcite post-filter, still correct
+ final String neResult = cluster.runSql(
+ "SELECT DISTINCT server FROM sys.server_properties WHERE server !=
'%s'",
+ brokerHost
+ );
+ Assertions.assertFalse(neResult.isEmpty(), "!= filter should still return
other servers");
+ for (String row : neResult.split("\n")) {
+ Assertions.assertFalse(
+ row.trim().equals(brokerHost),
+ "!= filter should exclude the broker: " + row
+ );
+ }
+
+ // AND with a non-pushdown predicate — server filter consumed, rest
handled by Calcite
+ final String andResult = cluster.runSql(
+ "SELECT server, property FROM sys.server_properties WHERE server =
'%s' AND node_roles LIKE '%%broker%%'",
+ brokerHost
+ );
+ Assertions.assertFalse(andResult.isEmpty(), "AND with node_roles filter
should return rows");
+ for (String row : andResult.split("\n")) {
+ Assertions.assertTrue(
+ row.startsWith(brokerHost + ","),
+ "Row should belong to filtered server: " + row
+ );
+ }
+ }
+
+ @Test
+ public void test_serverPropertiesTable_serviceNameFilterPushdown()
+ {
+ final String brokerHost = StringUtils.format("localhost:%s", BROKER_PORT);
+
+ // Equality filter on service_name returns only matching rows
+ final String result = cluster.runSql(
+ "SELECT server, service_name, property FROM sys.server_properties
WHERE service_name = '%s'",
+ BROKER_SERVICE
+ );
+ Assertions.assertFalse(result.isEmpty(), "Should return properties for the
broker service");
+ for (String row : result.split("\n")) {
+ String[] cols = row.split(",", -1);
+ Assertions.assertEquals(BROKER_SERVICE, cols[1], "Row should belong to
filtered service_name: " + row);
+ }
+
+ // Non-existent service_name returns no rows
+ final String emptyResult = cluster.runSql(
+ "SELECT * FROM sys.server_properties WHERE service_name =
'nonexistent/service'"
+ );
+ Assertions.assertTrue(emptyResult.isEmpty(), "Non-existent service_name
filter should return no rows");
+
+ // != falls back to Calcite post-filter
+ final String neResult = cluster.runSql(
+ "SELECT DISTINCT service_name FROM sys.server_properties WHERE
service_name != '%s'",
+ BROKER_SERVICE
+ );
+ Assertions.assertFalse(neResult.isEmpty(), "!= filter should still return
other services");
+ for (String row : neResult.split("\n")) {
+ Assertions.assertFalse(
+ row.trim().equals(BROKER_SERVICE),
+ "!= filter should exclude the broker service: " + row
+ );
+ }
+
+ // Both server and service_name filters consumed together
+ final String andResult = cluster.runSql(
+ "SELECT server, service_name, property FROM sys.server_properties
WHERE service_name = '%s' AND server = '%s'",
+ BROKER_SERVICE, brokerHost
+ );
+ Assertions.assertFalse(andResult.isEmpty(), "AND with server and
service_name should return rows");
+ for (String row : andResult.split("\n")) {
+ String[] cols = row.split(",", -1);
+ Assertions.assertEquals(brokerHost, cols[0], "Row server should match: "
+ row);
+ Assertions.assertEquals(BROKER_SERVICE, cols[1], "Row service_name
should match: " + row);
+ }
+ }
+
+ @Test
+ public void test_serverPropertiesTable_errorMessageIsNullForHealthyServers()
+ {
+ // All 3 servers in the embedded cluster are healthy, so no rows should
have a non-null error_message
+ final String errorRows = cluster.runSql("SELECT server FROM
sys.server_properties WHERE error_message IS NOT NULL");
+ Assertions.assertTrue(errorRows.isEmpty(), "Healthy servers should have
null error_message");
+
+ // Every row should have a null error_message
+ final String totalCount = cluster.runSql("SELECT COUNT(*) FROM
sys.server_properties");
+ final String nullErrorCount = cluster.runSql("SELECT COUNT(*) FROM
sys.server_properties WHERE error_message IS NULL");
+ Assertions.assertEquals(totalCount, nullErrorCount, "All rows should have
null error_message in a healthy cluster");
+ }
+
private void verifyPropertiesForServer(Map<String, String> properties,
String serivceName, String hostAndPort, String nodeRole)
{
String[] expectedRows = properties.entrySet().stream().map(entry ->
String.join(
@@ -154,7 +266,8 @@ public class SystemServerPropertiesTableTest extends
EmbeddedClusterTestBase
escapeCsvField(serivceName),
escapeCsvField(ImmutableList.of(nodeRole).toString()),
escapeCsvField(entry.getKey()),
- escapeCsvField(entry.getValue())
+ escapeCsvField(entry.getValue()),
+ escapeCsvField(null)
)).toArray(String[]::new);
Arrays.sort(expectedRows, String::compareTo);
final String result = cluster.runSql("SELECT * FROM sys.server_properties
WHERE server='%s'", hostAndPort);
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java
index df8b313c8e1..4b25c3cefa1 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemServerPropertiesTable.java
@@ -27,13 +27,18 @@ import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ProjectableFilterableTable;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.sql.SqlKind;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.error.InternalServerError;
-import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import
org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
@@ -47,15 +52,19 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.table.RowSignatures;
import org.jboss.netty.handler.codec.http.HttpMethod;
+import javax.annotation.Nullable;
import javax.servlet.http.HttpServletResponse;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -64,8 +73,10 @@ import java.util.stream.Collectors;
* that server would have multiple values in the column {@code node_roles}
rather than duplicating all the
* rows.
*/
-public class SystemServerPropertiesTable extends AbstractTable implements
ScannableTable
+public class SystemServerPropertiesTable extends AbstractTable implements
ProjectableFilterableTable
{
+ private static final Logger log = new
Logger(SystemServerPropertiesTable.class);
+
public static final String TABLE_NAME = "server_properties";
static final RowSignature ROW_SIGNATURE = RowSignature
@@ -75,8 +86,16 @@ public class SystemServerPropertiesTable extends
AbstractTable implements Scanna
.add("node_roles", ColumnType.STRING)
.add("property", ColumnType.STRING)
.add("value", ColumnType.STRING)
+ .add("error_message", ColumnType.STRING)
.build();
+ private static final int SERVER_INDEX = ROW_SIGNATURE.indexOf("server");
+ private static final int SERVICE_NAME_INDEX =
ROW_SIGNATURE.indexOf("service_name");
+ private static final int NODE_ROLES_INDEX =
ROW_SIGNATURE.indexOf("node_roles");
+ private static final int PROPERTY_INDEX = ROW_SIGNATURE.indexOf("property");
+ private static final int VALUE_INDEX = ROW_SIGNATURE.indexOf("value");
+ private static final int ERROR_MESSAGE_INDEX =
ROW_SIGNATURE.indexOf("error_message");
+
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private final AuthorizerMapper authorizerMapper;
private final HttpClient httpClient;
@@ -108,42 +127,126 @@ public class SystemServerPropertiesTable extends
AbstractTable implements Scanna
}
@Override
- public Enumerable<Object[]> scan(DataContext root)
+ public Enumerable<Object[]> scan(
+ final DataContext root,
+ final List<RexNode> filters,
+ @Nullable final int[] projects
+ )
{
final AuthenticationResult authenticationResult = (AuthenticationResult)
Preconditions.checkNotNull(
root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT),
"authenticationResult in dataContext"
);
SystemSchema.checkStateReadAccessForServers(authenticationResult,
authorizerMapper);
+
+ // Extract equality filters to skip fetching properties from non-matching
servers.
+ final Map<Integer, Set<String>> columnFilters =
extractColumnEqualityFilters(filters, SERVER_INDEX, SERVICE_NAME_INDEX);
+ final Set<String> serverFilter = columnFilters.get(SERVER_INDEX);
+ final Set<String> serviceNameFilter =
columnFilters.get(SERVICE_NAME_INDEX);
+
final Iterator<DiscoveryDruidNode> druidServers =
SystemSchema.getDruidServers(druidNodeDiscoveryProvider);
final Map<String, ServerProperties> serverToPropertiesMap = new
HashMap<>();
druidServers.forEachRemaining(discoveryDruidNode -> {
final DruidNode druidNode = discoveryDruidNode.getDruidNode();
- final Map<String, String> propertiesMap = getProperties(druidNode);
- if (serverToPropertiesMap.containsKey(druidNode.getHostAndPortToUse())) {
- ServerProperties serverProperties =
serverToPropertiesMap.get(druidNode.getHostAndPortToUse());
-
serverProperties.addNodeRole(discoveryDruidNode.getNodeRole().getJsonName());
+ final String nodeRole = discoveryDruidNode.getNodeRole().getJsonName();
+
+ final String serverKey = druidNode.getHostAndPortToUse();
+
+ if (serverFilter != null && !serverFilter.contains(serverKey)) {
+ return;
+ }
+ if (serviceNameFilter != null &&
!serviceNameFilter.contains(druidNode.getServiceName())) {
+ return;
+ }
+
+ final ServerProperties serverProperties =
serverToPropertiesMap.get(serverKey);
+ if (serverProperties != null) {
+ serverProperties.addNodeRole(nodeRole);
} else {
serverToPropertiesMap.put(
- druidNode.getHostAndPortToUse(),
+ serverKey,
new ServerProperties(
- druidNode.getServiceName(),
- druidNode.getHostAndPortToUse(),
- new
ArrayList<>(Arrays.asList(discoveryDruidNode.getNodeRole().getJsonName())),
- propertiesMap
- )
+ druidNode.getServiceName(),
+ serverKey,
+ new ArrayList<>(Arrays.asList(nodeRole)),
+ druidNode
+ )
);
}
});
- ArrayList<Object[]> rows = new ArrayList<>();
+
+ final List<Object[]> rows = new ArrayList<>();
for (ServerProperties serverProperties : serverToPropertiesMap.values()) {
- rows.addAll(serverProperties.toRows());
+ rows.addAll(serverProperties.buildRows(this, projects));
}
return Linq4j.asEnumerable(rows);
}
- private Map<String, String> getProperties(DruidNode druidNode)
+ /**
+ * Extracts simple equality filters ({@code column = 'literal'}) for the
specified columns.
+ * Only handles top-level AND equalities; any other predicate (!=, LIKE, OR,
functions) is
+ * ignored and left for Calcite to apply as a post-filter.
+ *
+ * @return map from column index to the set of literal values; absent key
means no filter for that column
+ */
+ private static Map<Integer, Set<String>> extractColumnEqualityFilters(final
List<RexNode> filters, final int... columnIndices)
+ {
+ final Map<Integer, Set<String>> result = new HashMap<>();
+ for (final RexNode filter : filters) {
+ for (final int columnIndex : columnIndices) {
+ final String value = extractEqualityOnColumn(filter, columnIndex);
+ if (value != null) {
+ result.computeIfAbsent(columnIndex, k -> new HashSet<>()).add(value);
+ break;
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Returns the string literal value if the node is a simple {@code column =
'literal'} (or reversed) equality
+ * on the given column index. Returns null for anything else — Calcite
handles those as post-filters.
+ */
+ @Nullable
+ private static String extractEqualityOnColumn(final RexNode node, final int
columnIndex)
+ {
+ if (!(node instanceof RexCall)) {
+ return null;
+ }
+ final RexCall call = (RexCall) node;
+ if (call.getKind() != SqlKind.EQUALS) {
+ return null;
+ }
+ final RexNode left = call.getOperands().get(0);
+ final RexNode right = call.getOperands().get(1);
+
+ if (left instanceof RexInputRef && right instanceof RexLiteral) {
+ if (((RexInputRef) left).getIndex() == columnIndex) {
+ return RexLiteral.stringValue(right);
+ }
+ } else if (right instanceof RexInputRef && left instanceof RexLiteral) {
+ if (((RexInputRef) right).getIndex() == columnIndex) {
+ return RexLiteral.stringValue(left);
+ }
+ }
+ return null;
+ }
+
+ private static Object[] projectRow(final Object[] row, @Nullable final int[]
projects)
+ {
+ if (projects == null) {
+ return row;
+ }
+ final Object[] projectedRow = new Object[projects.length];
+ for (int i = 0; i < projects.length; i++) {
+ projectedRow[i] = row[projects[i]];
+ }
+ return projectedRow;
+ }
+
+ private PropertiesResult getProperties(DruidNode druidNode)
{
final String url =
druidNode.getUriToUse().resolve("/status/properties").toString();
try {
@@ -154,20 +257,38 @@ public class SystemServerPropertiesTable extends
AbstractTable implements Scanna
.get();
if (response.getStatus().getCode() != HttpServletResponse.SC_OK) {
- throw new RE(
- "Failed to get properties from node[%s]. Error code[%d],
description[%s].",
- url,
- response.getStatus().getCode(),
- response.getStatus().getReasonPhrase()
- );
+ final String errorMsg = StringUtils.format("HTTP %d: %s",
+
response.getStatus().getCode(),
+
response.getStatus().getReasonPhrase());
+ log.warn("Failed to get properties from node[%s]: error[%s]", url,
errorMsg);
+ return new PropertiesResult(new HashMap<>(), errorMsg);
}
- return jsonMapper.readValue(
- response.getContent(),
- new TypeReference<>(){}
+ return new PropertiesResult(
+ jsonMapper.readValue(response.getContent(), new TypeReference<>(){}),
+ null
);
}
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(StringUtils.format("Interrupted while
fetching properties from node[%s]", url), e);
+ }
catch (Exception e) {
- throw InternalServerError.exception(e, "HTTP request to[%s] failed",
url);
+ final String errorMsg = e.getMessage() != null ? e.getMessage() :
e.getClass().getSimpleName();
+ log.warn(e, "Failed to get properties from node[%s]", url);
+ return new PropertiesResult(new HashMap<>(), errorMsg);
+ }
+ }
+
+ private static class PropertiesResult
+ {
+ final Map<String, String> properties;
+ @Nullable
+ final String error;
+
+ PropertiesResult(Map<String, String> properties, @Nullable String error)
+ {
+ this.properties = properties;
+ this.error = error;
}
}
@@ -176,14 +297,19 @@ public class SystemServerPropertiesTable extends
AbstractTable implements Scanna
final String serviceName;
final String server;
final List<String> nodeRoles;
- final Map<String, String> properties;
+ final DruidNode druidNode;
- public ServerProperties(String serviceName, String server, List<String>
nodeRoles, Map<String, String> properties)
+ public ServerProperties(
+ String serviceName,
+ String server,
+ List<String> nodeRoles,
+ DruidNode druidNode
+ )
{
this.serviceName = serviceName;
this.server = server;
this.nodeRoles = nodeRoles;
- this.properties = properties;
+ this.druidNode = druidNode;
}
public void addNodeRole(String nodeRole)
@@ -191,10 +317,39 @@ public class SystemServerPropertiesTable extends
AbstractTable implements Scanna
nodeRoles.add(nodeRole);
}
- public List<Object[]> toRows()
+ private List<Object[]> buildRows(
+ final SystemServerPropertiesTable table,
+ @Nullable final int[] projects
+ )
{
- String nodeRolesString = nodeRoles.toString();
- return properties.entrySet().stream().map(entry -> new Object[]{server,
serviceName, nodeRolesString, entry.getKey(),
entry.getValue()}).collect(Collectors.toList());
+ final String nodeRolesString = nodeRoles.toString();
+ final PropertiesResult result = table.getProperties(druidNode);
+ final Map<String, String> properties = result.properties;
+ final String error = result.error;
+
+ if (properties.isEmpty()) {
+ final Object[] row = new Object[ROW_SIGNATURE.size()];
+ row[SERVER_INDEX] = server;
+ row[SERVICE_NAME_INDEX] = serviceName;
+ row[NODE_ROLES_INDEX] = nodeRolesString;
+ row[PROPERTY_INDEX] = null;
+ row[VALUE_INDEX] = null;
+ row[ERROR_MESSAGE_INDEX] = error;
+ return Collections.singletonList(projectRow(row, projects));
+ }
+
+ return properties.entrySet().stream()
+ .map(entry -> {
+ final Object[] row = new Object[ROW_SIGNATURE.size()];
+ row[SERVER_INDEX] = server;
+ row[SERVICE_NAME_INDEX] = serviceName;
+ row[NODE_ROLES_INDEX] = nodeRolesString;
+ row[PROPERTY_INDEX] = entry.getKey();
+ row[VALUE_INDEX] = entry.getValue();
+ row[ERROR_MESSAGE_INDEX] = error;
+ return projectRow(row, projects);
+ })
+ .collect(Collectors.toList());
}
}
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index bb381b18718..498d02c3929 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import junitparams.converters.Nullable;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
@@ -34,8 +35,11 @@ import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.FilteredServerInventoryView;
@@ -151,6 +155,10 @@ public class SystemSchemaTest extends CalciteTestBase
{
private static final ObjectMapper MAPPER = CalciteTests.getJsonMapper();
+ private static final int SERVER_INDEX =
SystemServerPropertiesTable.ROW_SIGNATURE.indexOf("server");
+ private static final int SERVICE_NAME_INDEX =
SystemServerPropertiesTable.ROW_SIGNATURE.indexOf("service_name");
+ private static final int PROPERTY_INDEX =
SystemServerPropertiesTable.ROW_SIGNATURE.indexOf("property");
+
private static final String DATASOURCE_ALL_ACCESS = "allAccess";
private static final BrokerSegmentMetadataCacheConfig
SEGMENT_CACHE_CONFIG_DEFAULT = BrokerSegmentMetadataCacheConfig.create();
@@ -596,7 +604,7 @@ public class SystemSchemaTest extends CalciteTestBase
.get("server_properties");
final RelDataType propertiesRowType = propertiesTable.getRowType(new
JavaTypeFactoryImpl());
final List<RelDataTypeField> propertiesFields =
propertiesRowType.getFieldList();
- Assert.assertEquals(5, propertiesFields.size());
+ Assert.assertEquals(6, propertiesFields.size());
}
@Test
@@ -1586,7 +1594,9 @@ public class SystemSchemaTest extends CalciteTestBase
coordinator.getDruidNode().getHostAndPortToUse(),
coordinator.getDruidNode().getServiceName(),
ImmutableList.of(coordinator.getNodeRole().getJsonName()).toString(),
- "druid.test-key", "test-value"
+ "druid.test-key",
+ "test-value",
+ null
});
HttpResponse coordinator2HttpResponse = new
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
@@ -1598,7 +1608,9 @@ public class SystemSchemaTest extends CalciteTestBase
coordinator2.getDruidNode().getHostAndPortToUse(),
coordinator2.getDruidNode().getServiceName(),
ImmutableList.of(coordinator2.getNodeRole().getJsonName()).toString(),
- "druid.test-key3", "test-value3"
+ "druid.test-key3",
+ "test-value3",
+ null
});
mockNodeDiscovery(NodeRole.MIDDLE_MANAGER, middleManager);
@@ -1614,14 +1626,18 @@ public class SystemSchemaTest extends CalciteTestBase
middleManager.getDruidNode().getHostAndPortToUse(),
middleManager.getDruidNode().getServiceName(),
ImmutableList.of(middleManager.getNodeRole().getJsonName()).toString(),
- "druid.test-key", "test-value"
+ "druid.test-key",
+ "test-value",
+ null
});
expectedRows
.add(new Object[]{
middleManager.getDruidNode().getHostAndPortToUse(),
- middleManager.getDruidNode().getServiceName(),
+ middleManager.getDruidNode().getServiceName(),
ImmutableList.of(middleManager.getNodeRole().getJsonName()).toString(),
- "druid.test-key2", "test-value2"
+ "druid.test-key2",
+ "test-value2",
+ null
});
Map<String, ListenableFuture<StringFullResponseHolder>> urlToResponse =
ImmutableMap.of(
@@ -1649,7 +1665,7 @@ public class SystemSchemaTest extends CalciteTestBase
EasyMock.replay(druidNodeDiscoveryProvider, responseHandler, httpClient);
DataContext dataContext = createDataContext(Users.SUPER);
- final List<Object[]> rows = propertiesTable.scan(dataContext).toList();
+ final List<Object[]> rows = propertiesTable.scan(dataContext,
Collections.emptyList(), null).toList();
expectedRows.sort((Object[] row1, Object[] row2) -> ((Comparable)
row1[0]).compareTo(row2[0]));
rows.sort((Object[] row1, Object[] row2) -> ((Comparable)
row1[0]).compareTo(row2[0]));
Assert.assertEquals(expectedRows.size(), rows.size());
@@ -1659,6 +1675,419 @@ public class SystemSchemaTest extends CalciteTestBase
}
+ @Test
+ public void testPropertiesTable_withUnreachableServer()
+ {
+ SystemServerPropertiesTable propertiesTable = new
SystemServerPropertiesTable(
+ druidNodeDiscoveryProvider,
+ authMapper,
+ httpClient,
+ MAPPER
+ );
+
+ mockAllNodeRolesWithCoordinator(coordinator);
+
+ // Mock HTTP client to throw exception (connection refused)
+ EasyMock.expect(
+ httpClient.go(
+ EasyMock.isA(Request.class),
+ EasyMock.isA(StringFullResponseHandler.class)
+ )
+ ).andThrow(new RuntimeException("Connection refused")).once();
+
+ EasyMock.replay(druidNodeDiscoveryProvider, httpClient);
+
+ DataContext dataContext = createDataContext(Users.SUPER);
+ final List<Object[]> rows = propertiesTable.scan(dataContext,
Collections.emptyList(), null).toList();
+
+ // Should return 1 row even though properties fetch failed
+ Assert.assertEquals(1, rows.size());
+
+ // Verify server info is present
+ Assert.assertEquals(coordinator.getDruidNode().getHostAndPortToUse(),
rows.get(0)[0]);
+ Assert.assertEquals(coordinator.getDruidNode().getServiceName(),
rows.get(0)[1]);
+
+ // Property and value should be null
+ Assert.assertNull(rows.get(0)[3]);
+ Assert.assertNull(rows.get(0)[4]);
+
+ // Error column (index 5) should contain error message
+ Assert.assertNotNull(rows.get(0)[5]);
+ String error = (String) rows.get(0)[5];
+ Assert.assertTrue("Error should mention connection refused",
error.contains("Connection refused"));
+
+ EasyMock.verify(druidNodeDiscoveryProvider, httpClient);
+ }
+
+ @Test
+ public void testPropertiesTable_withHttpError()
+ {
+ SystemServerPropertiesTable propertiesTable = new
SystemServerPropertiesTable(
+ druidNodeDiscoveryProvider,
+ authMapper,
+ httpClient,
+ MAPPER
+ );
+
+ mockAllNodeRolesWithCoordinator(coordinator);
+
+ // Mock HTTP client to return 503 error
+ HttpResponse errorHttpResponse = new
DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.SERVICE_UNAVAILABLE);
+ StringFullResponseHolder errorResponseHolder = new
StringFullResponseHolder(errorHttpResponse, StandardCharsets.UTF_8);
+ errorResponseHolder.addChunk("Service temporarily unavailable");
+
+ EasyMock.expect(
+ httpClient.go(
+ EasyMock.isA(Request.class),
+ EasyMock.isA(StringFullResponseHandler.class)
+ )
+ ).andReturn(Futures.immediateFuture(errorResponseHolder)).once();
+
+ EasyMock.replay(druidNodeDiscoveryProvider, httpClient);
+
+ DataContext dataContext = createDataContext(Users.SUPER);
+ final List<Object[]> rows = propertiesTable.scan(dataContext,
Collections.emptyList(), null).toList();
+
+ Assert.assertEquals(1, rows.size());
+
+ // Error column should contain HTTP status
+ Assert.assertNotNull(rows.get(0)[5]);
+ String error = (String) rows.get(0)[5];
+ Assert.assertTrue("Error should mention HTTP 503", error.contains("503"));
+
+ EasyMock.verify(druidNodeDiscoveryProvider, httpClient);
+ }
+
+ @Test
+ public void testPropertiesTable_filterPushdown()
+ {
+ SystemServerPropertiesTable propertiesTable = new
SystemServerPropertiesTable(
+ druidNodeDiscoveryProvider,
+ authMapper,
+ httpClient,
+ MAPPER
+ );
+
+ mockAllNodeRolesWithCoordinator(coordinator, coordinator2);
+
+ // coordinator (localhost:8081, service "s1") will be fetched;
coordinator2 (localhost:8181, service "s1") will not
+ HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
+ StringFullResponseHolder holder = new StringFullResponseHolder(resp,
StandardCharsets.UTF_8);
+ holder.addChunk("{\"druid.key\": \"val\"}");
+
+ EasyMock.expect(
+ httpClient.go(EasyMock.isA(Request.class),
EasyMock.isA(StringFullResponseHandler.class))
+ ).andReturn(Futures.immediateFuture(holder)).once();
+
+ EasyMock.replay(druidNodeDiscoveryProvider, httpClient);
+
+ final RexBuilder rexBuilder = new RexBuilder(new JavaTypeFactoryImpl());
+ final RelDataType rowType = propertiesTable.getRowType(new
JavaTypeFactoryImpl());
+
+ // server = 'localhost:8081' — only coordinator matches, coordinator2
skipped (1 HTTP call)
+ final RexNode serverEquality = rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS,
+
rexBuilder.makeInputRef(rowType.getFieldList().get(SERVER_INDEX).getType(),
SERVER_INDEX),
+ rexBuilder.makeLiteral("localhost:8081")
+ );
+
+ DataContext dataContext = createDataContext(Users.SUPER);
+ final List<Object[]> rows = propertiesTable.scan(dataContext,
ImmutableList.of(serverEquality), null).toList();
+
+ Assert.assertEquals(1, rows.size());
+ Assert.assertEquals("localhost:8081", rows.get(0)[0]);
+ Assert.assertEquals("druid.key", rows.get(0)[3]);
+ Assert.assertEquals("val", rows.get(0)[4]);
+
+ EasyMock.verify(druidNodeDiscoveryProvider, httpClient);
+ }
+
+ @Test
+ public void testPropertiesTable_filterPushdownServiceNameAndNonMatching()
+ {
+ SystemServerPropertiesTable propertiesTable = new
SystemServerPropertiesTable(
+ druidNodeDiscoveryProvider,
+ authMapper,
+ httpClient,
+ MAPPER
+ );
+
+ mockAllNodeRolesWithCoordinator(coordinator, coordinator2);
+
+ // Both coordinators have service "s1", so both match service_name filter
+ HttpResponse resp1 = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
+ StringFullResponseHolder holder1 = new StringFullResponseHolder(resp1,
StandardCharsets.UTF_8);
+ holder1.addChunk("{\"k1\": \"v1\"}");
+
+ HttpResponse resp2 = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
+ StringFullResponseHolder holder2 = new StringFullResponseHolder(resp2,
StandardCharsets.UTF_8);
+ holder2.addChunk("{\"k2\": \"v2\"}");
+
+ EasyMock.expect(
+ httpClient.go(EasyMock.isA(Request.class),
EasyMock.isA(StringFullResponseHandler.class))
+ ).andAnswer(() -> {
+ Request req = (Request) EasyMock.getCurrentArguments()[0];
+ String url = req.getUrl().toString();
+ if (url.contains("8081")) {
+ return Futures.immediateFuture(holder1);
+ } else {
+ return Futures.immediateFuture(holder2);
+ }
+ }).times(2);
+
+ EasyMock.replay(druidNodeDiscoveryProvider, httpClient);
+
+ final RexBuilder rexBuilder = new RexBuilder(new JavaTypeFactoryImpl());
+ final RelDataType rowType = propertiesTable.getRowType(new
JavaTypeFactoryImpl());
+
+ // service_name = 's1' — both coordinators match
+ final RexNode serviceNameEquality = rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS,
+
rexBuilder.makeInputRef(rowType.getFieldList().get(SERVICE_NAME_INDEX).getType(),
SERVICE_NAME_INDEX),
+ rexBuilder.makeLiteral("s1")
+ );
+
+ DataContext dataContext = createDataContext(Users.SUPER);
+ List<Object[]> rows = propertiesTable.scan(dataContext,
ImmutableList.of(serviceNameEquality), null).toList();
+ Assert.assertEquals(2, rows.size());
+
+ // Non-matching server filter returns 0 rows with no HTTP calls
+ EasyMock.verify(druidNodeDiscoveryProvider, httpClient);
+ EasyMock.reset(druidNodeDiscoveryProvider, httpClient);
+ mockAllNodeRolesWithCoordinator(coordinator);
+ EasyMock.replay(druidNodeDiscoveryProvider, httpClient);
+
+ final RexNode nonMatchingFilter = rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS,
+
rexBuilder.makeInputRef(rowType.getFieldList().get(SERVER_INDEX).getType(),
SERVER_INDEX),
+ rexBuilder.makeLiteral("nonexistent:9999")
+ );
+
+ dataContext = createDataContext(Users.SUPER);
+ rows = propertiesTable.scan(dataContext,
ImmutableList.of(nonMatchingFilter), null).toList();
+ Assert.assertEquals(0, rows.size());
+
+ EasyMock.verify(druidNodeDiscoveryProvider, httpClient);
+ }
+
+ @Test
+ public void testPropertiesTable_filterFallback()
+ {
+ SystemServerPropertiesTable propertiesTable = new
SystemServerPropertiesTable(
+ druidNodeDiscoveryProvider,
+ authMapper,
+ httpClient,
+ MAPPER
+ );
+
+ final RexBuilder rexBuilder = new RexBuilder(new JavaTypeFactoryImpl());
+ final RelDataType rowType = propertiesTable.getRowType(new
JavaTypeFactoryImpl());
+
+ // 1) NOT_EQUALS is not pushed down — all rows returned
+ mockAllNodeRolesWithCoordinator(coordinator);
+ HttpResponse resp1 = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
+ StringFullResponseHolder holder1 = new StringFullResponseHolder(resp1,
StandardCharsets.UTF_8);
+ holder1.addChunk("{\"druid.key\": \"val\", \"druid.other\": \"other\"}");
+ EasyMock.expect(httpClient.go(EasyMock.isA(Request.class),
EasyMock.isA(StringFullResponseHandler.class)))
+ .andReturn(Futures.immediateFuture(holder1)).once();
+ EasyMock.replay(druidNodeDiscoveryProvider, httpClient);
+
+ final RexNode notEquals = rexBuilder.makeCall(
+ SqlStdOperatorTable.NOT_EQUALS,
+
rexBuilder.makeInputRef(rowType.getFieldList().get(SERVER_INDEX).getType(),
SERVER_INDEX),
+ rexBuilder.makeLiteral("some-server:1234")
+ );
+ Assert.assertEquals(2,
propertiesTable.scan(createDataContext(Users.SUPER),
ImmutableList.of(notEquals), null).toList().size());
+ EasyMock.verify(druidNodeDiscoveryProvider, httpClient);
+
+ // 2) Non-RexCall filter (bare RexInputRef) is ignored
+ EasyMock.reset(druidNodeDiscoveryProvider, httpClient);
+ mockAllNodeRolesWithCoordinator(coordinator);
+ HttpResponse resp2 = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
+ StringFullResponseHolder holder2 = new StringFullResponseHolder(resp2,
StandardCharsets.UTF_8);
+ holder2.addChunk("{\"druid.key\": \"val\", \"druid.other\": \"other\"}");
+ EasyMock.expect(httpClient.go(EasyMock.isA(Request.class),
EasyMock.isA(StringFullResponseHandler.class)))
+ .andReturn(Futures.immediateFuture(holder2)).once();
+ EasyMock.replay(druidNodeDiscoveryProvider, httpClient);
+
+ final RexNode inputRef =
rexBuilder.makeInputRef(rowType.getFieldList().get(SERVER_INDEX).getType(),
SERVER_INDEX);
+ Assert.assertEquals(2,
propertiesTable.scan(createDataContext(Users.SUPER),
ImmutableList.of(inputRef), null).toList().size());
+ EasyMock.verify(druidNodeDiscoveryProvider, httpClient);
+
+ // 3) Equality on non-pushed column (property) is ignored
+ EasyMock.reset(druidNodeDiscoveryProvider, httpClient);
+ mockAllNodeRolesWithCoordinator(coordinator);
+ HttpResponse resp3 = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
+ StringFullResponseHolder holder3 = new StringFullResponseHolder(resp3,
StandardCharsets.UTF_8);
+ holder3.addChunk("{\"druid.key\": \"val\", \"druid.other\": \"other\"}");
+ EasyMock.expect(httpClient.go(EasyMock.isA(Request.class),
EasyMock.isA(StringFullResponseHandler.class)))
+ .andReturn(Futures.immediateFuture(holder3)).once();
+ EasyMock.replay(druidNodeDiscoveryProvider, httpClient);
+
+ final RexNode propertyEquality = rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS,
+
rexBuilder.makeInputRef(rowType.getFieldList().get(PROPERTY_INDEX).getType(),
PROPERTY_INDEX),
+ rexBuilder.makeLiteral("druid.key")
+ );
+ Assert.assertEquals(2,
propertiesTable.scan(createDataContext(Users.SUPER),
ImmutableList.of(propertyEquality), null).toList().size());
+ EasyMock.verify(druidNodeDiscoveryProvider, httpClient);
+
+ // 4) Reversed equality ('localhost:8081' = server) is correctly extracted
+ EasyMock.reset(druidNodeDiscoveryProvider, httpClient);
+ mockAllNodeRolesWithCoordinator(coordinator);
+ HttpResponse resp4 = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
+ StringFullResponseHolder holder4 = new StringFullResponseHolder(resp4,
StandardCharsets.UTF_8);
+ holder4.addChunk("{\"druid.key\": \"val\", \"druid.other\": \"other\"}");
+ EasyMock.expect(httpClient.go(EasyMock.isA(Request.class),
EasyMock.isA(StringFullResponseHandler.class)))
+ .andReturn(Futures.immediateFuture(holder4)).once();
+ EasyMock.replay(druidNodeDiscoveryProvider, httpClient);
+
+ final RexNode reversedEquality = rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS,
+ rexBuilder.makeLiteral("localhost:8081"),
+
rexBuilder.makeInputRef(rowType.getFieldList().get(SERVER_INDEX).getType(),
SERVER_INDEX)
+ );
+ List<Object[]> rows = propertiesTable.scan(createDataContext(Users.SUPER),
ImmutableList.of(reversedEquality), null).toList();
+ Assert.assertEquals(2, rows.size());
+ Assert.assertEquals("localhost:8081", rows.get(0)[0]);
+ EasyMock.verify(druidNodeDiscoveryProvider, httpClient);
+ }
+
+ @Test
+ public void testPropertiesTable_projectionAndMultiRole()
+ {
+ SystemServerPropertiesTable propertiesTable = new
SystemServerPropertiesTable(
+ druidNodeDiscoveryProvider,
+ authMapper,
+ httpClient,
+ MAPPER
+ );
+
+ // Same host:port under two roles
+ DiscoveryDruidNode coordinatorRole = new DiscoveryDruidNode(
+ new DruidNode("s1", "localhost", false, 8081, null, true, false),
+ NodeRole.COORDINATOR,
+ ImmutableMap.of(),
+ startTime
+ );
+ DiscoveryDruidNode overlordRole = new DiscoveryDruidNode(
+ new DruidNode("s1", "localhost", false, 8081, null, true, false),
+ NodeRole.OVERLORD,
+ ImmutableMap.of(),
+ startTime
+ );
+
+ mockNodeDiscovery(NodeRole.BROKER);
+ mockNodeDiscovery(NodeRole.ROUTER);
+ mockNodeDiscovery(NodeRole.HISTORICAL);
+ mockNodeDiscovery(NodeRole.OVERLORD, overlordRole);
+ mockNodeDiscovery(NodeRole.PEON);
+ mockNodeDiscovery(NodeRole.INDEXER);
+ mockNodeDiscovery(NodeRole.MIDDLE_MANAGER);
+ mockNodeDiscovery(NodeRole.COORDINATOR, coordinatorRole);
+
+ HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
+ StringFullResponseHolder holder = new StringFullResponseHolder(resp,
StandardCharsets.UTF_8);
+ holder.addChunk("{\"druid.port\": \"8081\"}");
+
+ EasyMock.expect(
+ httpClient.go(EasyMock.isA(Request.class),
EasyMock.isA(StringFullResponseHandler.class))
+ ).andReturn(Futures.immediateFuture(holder)).once();
+
+ EasyMock.replay(druidNodeDiscoveryProvider, httpClient);
+
+ DataContext dataContext = createDataContext(Users.SUPER);
+
+ // Multi-role: only 1 HTTP call, node_roles contains both
+ final List<Object[]> fullRows = propertiesTable.scan(dataContext,
Collections.emptyList(), null).toList();
+ Assert.assertEquals(1, fullRows.size());
+ String nodeRoles = (String) fullRows.get(0)[2];
+ Assert.assertTrue(nodeRoles.contains("coordinator"));
+ Assert.assertTrue(nodeRoles.contains("overlord"));
+
+ // Projection: project only server (0) and property (3)
+ EasyMock.reset(druidNodeDiscoveryProvider, httpClient);
+ mockAllNodeRolesWithCoordinator(coordinator);
+
+ HttpResponse resp2 = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
+ StringFullResponseHolder holder2 = new StringFullResponseHolder(resp2,
StandardCharsets.UTF_8);
+ holder2.addChunk("{\"druid.port\": \"8081\"}");
+ EasyMock.expect(
+ httpClient.go(EasyMock.isA(Request.class),
EasyMock.isA(StringFullResponseHandler.class))
+ ).andReturn(Futures.immediateFuture(holder2)).once();
+ EasyMock.replay(druidNodeDiscoveryProvider, httpClient);
+
+ final int[] projects = new int[]{0, 3};
+ final List<Object[]> projectedRows = propertiesTable.scan(dataContext,
Collections.emptyList(), projects).toList();
+ Assert.assertEquals(1, projectedRows.size());
+ Assert.assertEquals(2, projectedRows.get(0).length);
+ Assert.assertEquals("localhost:8081", projectedRows.get(0)[0]);
+ Assert.assertEquals("druid.port", projectedRows.get(0)[1]);
+
+ EasyMock.verify(druidNodeDiscoveryProvider, httpClient);
+ }
+
+ @Test
+ public void testPropertiesTable_withInterruptedException() throws Exception
+ {
+ SystemServerPropertiesTable propertiesTable = new
SystemServerPropertiesTable(
+ druidNodeDiscoveryProvider,
+ authMapper,
+ httpClient,
+ MAPPER
+ );
+
+ mockAllNodeRolesWithCoordinator(coordinator);
+
+ SettableFuture<StringFullResponseHolder> interruptingFuture =
SettableFuture.create();
+
+ EasyMock.expect(
+ httpClient.go(EasyMock.isA(Request.class),
EasyMock.isA(StringFullResponseHandler.class))
+ ).andReturn(interruptingFuture).once();
+
+ EasyMock.replay(druidNodeDiscoveryProvider, httpClient);
+
+ DataContext dataContext = createDataContext(Users.SUPER);
+ Thread.currentThread().interrupt();
+ RuntimeException ex = Assert.assertThrows(
+ RuntimeException.class,
+ () -> propertiesTable.scan(dataContext, Collections.emptyList(),
null).toList()
+ );
+ Assert.assertTrue(ex.getMessage().contains("Interrupted"));
+ Assert.assertTrue(Thread.currentThread().isInterrupted());
+ Thread.interrupted();
+
+ EasyMock.verify(druidNodeDiscoveryProvider, httpClient);
+ }
+
+ @Test
+ public void testPropertiesTable_exceptionWithNullMessage()
+ {
+ SystemServerPropertiesTable propertiesTable = new
SystemServerPropertiesTable(
+ druidNodeDiscoveryProvider,
+ authMapper,
+ httpClient,
+ MAPPER
+ );
+
+ mockAllNodeRolesWithCoordinator(coordinator);
+
+ // Exception with no message — error_message should fall back to class
simple name
+ EasyMock.expect(
+ httpClient.go(EasyMock.isA(Request.class),
EasyMock.isA(StringFullResponseHandler.class))
+ ).andThrow(new RuntimeException((String) null)).once();
+
+ EasyMock.replay(druidNodeDiscoveryProvider, httpClient);
+
+ DataContext dataContext = createDataContext(Users.SUPER);
+ final List<Object[]> rows = propertiesTable.scan(dataContext,
Collections.emptyList(), null).toList();
+
+ Assert.assertEquals(1, rows.size());
+ Assert.assertEquals("RuntimeException", rows.get(0)[5]);
+
+ EasyMock.verify(druidNodeDiscoveryProvider, httpClient);
+ }
+
@Test
public void testQueriesTable()
{
@@ -1975,6 +2404,18 @@ public class SystemSchemaTest extends CalciteTestBase
return druidNodeDiscovery;
}
+ private void mockAllNodeRolesWithCoordinator(DiscoveryDruidNode...
coordinators)
+ {
+ mockNodeDiscovery(NodeRole.BROKER);
+ mockNodeDiscovery(NodeRole.ROUTER);
+ mockNodeDiscovery(NodeRole.HISTORICAL);
+ mockNodeDiscovery(NodeRole.OVERLORD);
+ mockNodeDiscovery(NodeRole.PEON);
+ mockNodeDiscovery(NodeRole.INDEXER);
+ mockNodeDiscovery(NodeRole.MIDDLE_MANAGER);
+ mockNodeDiscovery(NodeRole.COORDINATOR, coordinators);
+ }
+
/**
* Usernames to be used in tests.
*/
diff --git a/website/.spelling b/website/.spelling
index 877653a3471..c85463563e1 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -718,6 +718,7 @@ druid.sql.planner.useApproximateTopN
druid.sql.planner.useLexicographicTopN
druid.sql.planner.useGroupingSetForExactDistinct
druid.sql.planner.useNativeQueryExplain
+error_message
error_msg
exprs
group_id
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]