xiangfu0 commented on code in PR #17357:
URL: https://github.com/apache/pinot/pull/17357#discussion_r3009116147
##########
pinot-core/src/main/java/org/apache/pinot/core/query/executor/sql/SqlQueryExecutor.java:
##########
@@ -121,11 +140,215 @@ public BrokerResponse
executeDMLStatement(SqlNodeAndOptions sqlNodeAndOptions,
return result;
}
+ /**
+ * Execute metadata (SHOW/USE) statements.
+ */
+ public BrokerResponse executeMetadataStatement(SqlNodeAndOptions
sqlNodeAndOptions,
+ @Nullable Map<String, String> headers) {
+ BrokerResponseNative result = new BrokerResponseNative();
+ try {
+ ResultTable resultTable =
+ buildMetadataResult(sqlNodeAndOptions.getSqlNode(),
sqlNodeAndOptions.getOptions(), headers);
+ result.setResultTable(resultTable);
+ } catch (Exception e) {
+ result.addException(new
QueryProcessingException(QueryErrorCode.QUERY_EXECUTION, e.getMessage()));
+ }
+ return result;
+ }
+
private MinionClient getMinionClient() {
// NOTE: using null auth provider here as auth headers injected by caller
in "executeDMLStatement()"
if (_helixManager != null) {
return new MinionClient(getControllerBaseUrl(_helixManager), null);
}
return new MinionClient(_controllerUrl, null);
}
+
+ private ResultTable buildMetadataResult(SqlNode sqlNode, Map<String, String>
options,
+ @Nullable Map<String, String> headers)
+ throws Exception {
+ if (sqlNode instanceof SqlShowDatabases) {
+ return toSingleStringColumnResult("databaseName",
fetchDatabases(headers));
+ }
+ if (sqlNode instanceof SqlShowTables) {
+ SqlShowTables showTables = (SqlShowTables) sqlNode;
+ String database = resolveDatabase(showTables.getDatabaseName(), options,
headers);
+ List<String> tables = stripDatabasePrefix(fetchTables(database,
headers), database);
+ return toSingleStringColumnResult("tableName", tables);
+ }
+ if (sqlNode instanceof SqlShowSchemas) {
+ SqlShowSchemas showSchemas = (SqlShowSchemas) sqlNode;
+ String database = resolveDatabase(null, options, headers);
+ List<String> schemas = stripDatabasePrefix(fetchSchemas(database,
headers), database);
+ String likePattern = getLikePattern(showSchemas.getLikePattern());
+ schemas = applyLikeFilter(schemas, likePattern);
+ return toSingleStringColumnResult("schemaName", schemas);
+ }
+ throw new UnsupportedOperationException("Unsupported METADATA SqlKind - "
+ sqlNode.getKind());
+ }
+
+ private ResultTable toSingleStringColumnResult(String columnName,
List<String> values) {
+ DataSchema dataSchema = new DataSchema(new String[]{columnName}, new
ColumnDataType[]{ColumnDataType.STRING});
+ List<Object[]> rows = values.stream().map(value -> new
Object[]{value}).collect(Collectors.toList());
+ return new ResultTable(dataSchema, rows);
+ }
+
+ private String resolveDatabase(@Nullable SqlIdentifier explicitDatabase,
Map<String, String> options,
+ @Nullable Map<String, String> headers)
+ throws DatabaseConflictException {
+ String databaseFromSql = explicitDatabase != null ?
explicitDatabase.toString() : null;
+ String databaseFromOptions = options.get(CommonConstants.DATABASE);
+ String databaseFromHeaders = getHeaderValue(headers,
CommonConstants.DATABASE);
+
+ if (databaseFromSql != null) {
+ if (databaseFromOptions != null &&
!databaseFromSql.equalsIgnoreCase(databaseFromOptions)) {
+ StringBuilder message = new StringBuilder("Database name
'").append(databaseFromSql)
+ .append("' from statement does not match database name
'").append(databaseFromOptions)
+ .append("' from query options");
+ if (databaseFromHeaders != null) {
+ message.append(" (request header database name:
'").append(databaseFromHeaders).append("')");
+ }
+ throw new DatabaseConflictException(message.toString());
+ }
+ if (databaseFromHeaders != null &&
!databaseFromSql.equalsIgnoreCase(databaseFromHeaders)) {
+ StringBuilder message = new StringBuilder("Database name
'").append(databaseFromSql)
+ .append("' from statement does not match database name
'").append(databaseFromHeaders)
+ .append("' from request header");
+ if (databaseFromOptions != null) {
+ message.append(" (query options database name:
'").append(databaseFromOptions).append("')");
+ }
+ throw new DatabaseConflictException(message.toString());
+ }
+ return databaseFromSql;
+ }
+
+ if (databaseFromOptions != null && databaseFromHeaders != null
+ && !databaseFromOptions.equalsIgnoreCase(databaseFromHeaders)) {
+ throw new DatabaseConflictException("Database name '" +
databaseFromHeaders + "' from request header does not "
+ + "match database name '" + databaseFromOptions + "' from query
options");
+ }
+
+ return databaseFromOptions != null ? databaseFromOptions
+ : databaseFromHeaders != null ? databaseFromHeaders :
CommonConstants.DEFAULT_DATABASE;
+ }
+
+ private List<String> stripDatabasePrefix(List<String> names, String
database) {
+ if (names.isEmpty()) {
+ return names;
+ }
+ return names.stream()
+ .map(name -> DatabaseUtils.isPartOfDatabase(name, database)
+ ? DatabaseUtils.removeDatabasePrefix(name, database)
+ : name)
+ .collect(Collectors.toList());
+ }
+
+ private List<String> applyLikeFilter(List<String> values, @Nullable String
likePattern) {
+ if (StringUtils.isEmpty(likePattern)) {
+ return values;
+ }
+ Pattern pattern = buildLikeRegex(likePattern);
+ return values.stream().filter(value ->
pattern.matcher(value).matches()).collect(Collectors.toList());
+ }
+
+ private Pattern buildLikeRegex(String likePattern) {
+ StringBuilder regex = new StringBuilder();
+ boolean escaped = false;
+ for (char c : likePattern.toCharArray()) {
+ if (escaped) {
+ regex.append(Pattern.quote(String.valueOf(c)));
+ escaped = false;
+ } else if (c == '\\') {
+ escaped = true;
+ } else if (c == '%') {
+ regex.append(".*");
+ } else if (c == '_') {
+ regex.append('.');
+ } else {
+ regex.append(Pattern.quote(String.valueOf(c)));
+ }
+ }
+ return Pattern.compile(regex.toString(), Pattern.CASE_INSENSITIVE);
+ }
+
+ private String getLikePattern(@Nullable SqlLiteral likeLiteral) {
+ return likeLiteral == null ? null : likeLiteral.toValue();
+ }
+
+ private List<String> fetchDatabases(@Nullable Map<String, String> headers)
+ throws IOException {
+ try (PinotAdminClient adminClient = createAdminClient(headers)) {
+ return adminClient.getDatabaseClient().listDatabaseNames();
+ } catch (Exception e) {
+ throw new IOException("Failed to fetch databases from controller", e);
+ }
+ }
Review Comment:
Updated in e053d09b52. The metadata fetch helpers now preserve IOException
directly and only wrap unexpected exceptions with contextual messages, so the
original I/O failure details are retained.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -199,6 +201,14 @@ protected List<FieldConfig> getFieldConfigs() {
return fieldConfigs;
}
+ @Override
+ protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+ super.overrideBrokerConf(brokerConf);
+ // Use IPv4 loopback explicitly to avoid 'localhost' IPv6 resolution
differences across environments for broker ->
Review Comment:
I re-investigated this while reproducing the failing IT. The actual SHOW
TABLES failure was broker-side: we were replaying inbound transport headers
like Host/Connection to the controller metadata endpoint, which could make the
controller return 400. That is fixed in e053d09b52 by filtering non-forwardable
headers before broker-to-controller metadata calls. I left the explicit
127.0.0.1 override here because this test compares against a direct admin
client and I want the controller address advertised in Helix to stay
explicit/deterministic, but it was not the root cause of the failing Actions
run.
##########
pinot-plugins/assembly-descriptor/pom.xml:
##########
@@ -77,4 +77,19 @@
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>skip-invoker-its-when-skipTests</id>
Review Comment:
Updated in e053d09b52. The root PR build uses -DskipTests, but
maven-invoker-plugin does not honor that flag by itself. This profile
propagates that intent to invoker.skip so the assembly-descriptor
sample-project invoker ITs are skipped in those builds as well.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/executor/sql/SqlQueryExecutor.java:
##########
@@ -121,11 +140,215 @@ public BrokerResponse
executeDMLStatement(SqlNodeAndOptions sqlNodeAndOptions,
return result;
}
+ /**
+ * Execute metadata (SHOW/USE) statements.
+ */
+ public BrokerResponse executeMetadataStatement(SqlNodeAndOptions
sqlNodeAndOptions,
+ @Nullable Map<String, String> headers) {
+ BrokerResponseNative result = new BrokerResponseNative();
+ try {
+ ResultTable resultTable =
+ buildMetadataResult(sqlNodeAndOptions.getSqlNode(),
sqlNodeAndOptions.getOptions(), headers);
+ result.setResultTable(resultTable);
+ } catch (Exception e) {
+ result.addException(new
QueryProcessingException(QueryErrorCode.QUERY_EXECUTION, e.getMessage()));
+ }
+ return result;
+ }
+
private MinionClient getMinionClient() {
// NOTE: using null auth provider here as auth headers injected by caller
in "executeDMLStatement()"
if (_helixManager != null) {
return new MinionClient(getControllerBaseUrl(_helixManager), null);
}
return new MinionClient(_controllerUrl, null);
}
+
+ private ResultTable buildMetadataResult(SqlNode sqlNode, Map<String, String>
options,
+ @Nullable Map<String, String> headers)
+ throws Exception {
+ if (sqlNode instanceof SqlShowDatabases) {
+ return toSingleStringColumnResult("databaseName",
fetchDatabases(headers));
+ }
+ if (sqlNode instanceof SqlShowTables) {
+ SqlShowTables showTables = (SqlShowTables) sqlNode;
+ String database = resolveDatabase(showTables.getDatabaseName(), options,
headers);
+ List<String> tables = stripDatabasePrefix(fetchTables(database,
headers), database);
+ return toSingleStringColumnResult("tableName", tables);
+ }
+ if (sqlNode instanceof SqlShowSchemas) {
+ SqlShowSchemas showSchemas = (SqlShowSchemas) sqlNode;
+ String database = resolveDatabase(null, options, headers);
+ List<String> schemas = stripDatabasePrefix(fetchSchemas(database,
headers), database);
+ String likePattern = getLikePattern(showSchemas.getLikePattern());
+ schemas = applyLikeFilter(schemas, likePattern);
+ return toSingleStringColumnResult("schemaName", schemas);
+ }
+ throw new UnsupportedOperationException("Unsupported METADATA SqlKind - "
+ sqlNode.getKind());
+ }
+
+ private ResultTable toSingleStringColumnResult(String columnName,
List<String> values) {
+ DataSchema dataSchema = new DataSchema(new String[]{columnName}, new
ColumnDataType[]{ColumnDataType.STRING});
+ List<Object[]> rows = values.stream().map(value -> new
Object[]{value}).collect(Collectors.toList());
+ return new ResultTable(dataSchema, rows);
+ }
+
+ private String resolveDatabase(@Nullable SqlIdentifier explicitDatabase,
Map<String, String> options,
+ @Nullable Map<String, String> headers)
+ throws DatabaseConflictException {
+ String databaseFromSql = explicitDatabase != null ?
explicitDatabase.toString() : null;
+ String databaseFromOptions = options.get(CommonConstants.DATABASE);
+ String databaseFromHeaders = getHeaderValue(headers,
CommonConstants.DATABASE);
+
+ if (databaseFromSql != null) {
+ if (databaseFromOptions != null &&
!databaseFromSql.equalsIgnoreCase(databaseFromOptions)) {
+ StringBuilder message = new StringBuilder("Database name
'").append(databaseFromSql)
+ .append("' from statement does not match database name
'").append(databaseFromOptions)
+ .append("' from query options");
+ if (databaseFromHeaders != null) {
+ message.append(" (request header database name:
'").append(databaseFromHeaders).append("')");
+ }
+ throw new DatabaseConflictException(message.toString());
+ }
+ if (databaseFromHeaders != null &&
!databaseFromSql.equalsIgnoreCase(databaseFromHeaders)) {
+ StringBuilder message = new StringBuilder("Database name
'").append(databaseFromSql)
+ .append("' from statement does not match database name
'").append(databaseFromHeaders)
+ .append("' from request header");
+ if (databaseFromOptions != null) {
+ message.append(" (query options database name:
'").append(databaseFromOptions).append("')");
+ }
+ throw new DatabaseConflictException(message.toString());
+ }
+ return databaseFromSql;
+ }
+
+ if (databaseFromOptions != null && databaseFromHeaders != null
+ && !databaseFromOptions.equalsIgnoreCase(databaseFromHeaders)) {
+ throw new DatabaseConflictException("Database name '" +
databaseFromHeaders + "' from request header does not "
+ + "match database name '" + databaseFromOptions + "' from query
options");
+ }
+
+ return databaseFromOptions != null ? databaseFromOptions
+ : databaseFromHeaders != null ? databaseFromHeaders :
CommonConstants.DEFAULT_DATABASE;
+ }
+
+ private List<String> stripDatabasePrefix(List<String> names, String
database) {
+ if (names.isEmpty()) {
+ return names;
+ }
+ return names.stream()
+ .map(name -> DatabaseUtils.isPartOfDatabase(name, database)
+ ? DatabaseUtils.removeDatabasePrefix(name, database)
+ : name)
+ .collect(Collectors.toList());
+ }
+
+ private List<String> applyLikeFilter(List<String> values, @Nullable String
likePattern) {
+ if (StringUtils.isEmpty(likePattern)) {
+ return values;
+ }
+ Pattern pattern = buildLikeRegex(likePattern);
+ return values.stream().filter(value ->
pattern.matcher(value).matches()).collect(Collectors.toList());
+ }
+
+ private Pattern buildLikeRegex(String likePattern) {
+ StringBuilder regex = new StringBuilder();
+ boolean escaped = false;
+ for (char c : likePattern.toCharArray()) {
+ if (escaped) {
+ regex.append(Pattern.quote(String.valueOf(c)));
+ escaped = false;
+ } else if (c == '\\') {
+ escaped = true;
+ } else if (c == '%') {
+ regex.append(".*");
+ } else if (c == '_') {
+ regex.append('.');
+ } else {
+ regex.append(Pattern.quote(String.valueOf(c)));
+ }
+ }
+ return Pattern.compile(regex.toString(), Pattern.CASE_INSENSITIVE);
+ }
+
+ private String getLikePattern(@Nullable SqlLiteral likeLiteral) {
+ return likeLiteral == null ? null : likeLiteral.toValue();
+ }
+
+ private List<String> fetchDatabases(@Nullable Map<String, String> headers)
+ throws IOException {
+ try (PinotAdminClient adminClient = createAdminClient(headers)) {
+ return adminClient.getDatabaseClient().listDatabaseNames();
+ } catch (Exception e) {
+ throw new IOException("Failed to fetch databases from controller", e);
+ }
+ }
+
+ private List<String> fetchTables(String database, @Nullable Map<String,
String> headers)
+ throws IOException {
+ Map<String, String> requestHeaders = new HashMap<>();
+ if (headers != null) {
+ requestHeaders.putAll(headers);
+ }
+ requestHeaders.put(CommonConstants.DATABASE, database);
+ try (PinotAdminClient adminClient = createAdminClient(requestHeaders)) {
+ return adminClient.getTableClient().listTables(null, null, null);
+ } catch (Exception e) {
+ throw new IOException("Failed to fetch tables from controller", e);
+ }
+ }
+
+ private List<String> fetchSchemas(String database, @Nullable Map<String,
String> headers)
+ throws IOException {
+ Map<String, String> requestHeaders = new HashMap<>();
+ if (headers != null) {
+ requestHeaders.putAll(headers);
+ }
+ requestHeaders.put(CommonConstants.DATABASE, database);
+ try (PinotAdminClient adminClient = createAdminClient(requestHeaders)) {
+ return adminClient.getSchemaClient().listSchemaNames();
+ } catch (Exception e) {
+ throw new IOException("Failed to fetch schemas from controller", e);
+ }
+ }
+
+ private PinotAdminClient createAdminClient(@Nullable Map<String, String>
headers)
+ throws IOException {
+ String baseUrl = _helixManager != null ?
getControllerBaseUrl(_helixManager) : _controllerUrl;
+ if (StringUtils.isBlank(baseUrl)) {
+ throw new IOException("Controller URL is not configured for metadata
query");
+ }
+
+ String normalizedBaseUrl = StringUtils.stripEnd(baseUrl.trim(), "/");
+ String uriString = normalizedBaseUrl.contains("://")
+ ? normalizedBaseUrl
+ : CommonConstants.HTTP_PROTOCOL + "://" + normalizedBaseUrl;
+
+ URI uri = URI.create(uriString);
+ String scheme = StringUtils.defaultIfBlank(uri.getScheme(),
CommonConstants.HTTP_PROTOCOL);
+ String authority = uri.getAuthority();
+ if (StringUtils.isBlank(authority)) {
+ throw new IOException("Invalid controller URL: " + baseUrl);
+ }
+
+ // PinotAdminTransport expects "host:port" (or "host:port/<basePath>") and
uses a separate scheme property.
+ String path =
StringUtils.stripEnd(StringUtils.defaultString(uri.getPath()), "/");
+ String controllerAddress = StringUtils.isBlank(path) || "/".equals(path) ?
authority : authority + path;
+
+ Properties properties = new Properties();
+ properties.setProperty(PinotAdminTransport.ADMIN_TRANSPORT_SCHEME, scheme);
+ return new PinotAdminClient(controllerAddress, properties, headers);
+ }
+
+ private String getHeaderValue(@Nullable Map<String, String> headers, String
key) {
+ if (headers == null || headers.isEmpty()) {
+ return null;
+ }
+ for (Map.Entry<String, String> entry : headers.entrySet()) {
+ if (entry.getKey().equalsIgnoreCase(key)) {
+ return entry.getValue();
+ }
+ }
+ return null;
+ }
Review Comment:
Updated in e053d09b52. Metadata requests now normalize headers once into a
case-insensitive map at entry, and I added a unit test covering mixed-case
database headers.
##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java:
##########
@@ -341,6 +341,19 @@ private String toJson(Object obj) {
*/
public List<String> parseStringArray(JsonNode response, String fieldName)
throws PinotAdminException {
+ if (response == null) {
+ throw new PinotAdminException("Response is null");
+ }
+
+ // Some endpoints return a root-level JSON array (e.g. GET /schemas, GET
/databases)
+ if (response.isArray()) {
+ List<String> result = new ArrayList<>();
+ for (JsonNode element : response) {
+ result.add(element.asText());
+ }
+ return result;
+ }
Review Comment:
Updated in e053d09b52. The Javadoc now explicitly documents the root-level
JSON array case in addition to the named-field extraction path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]