This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new 2965c9c KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (#8723) 2965c9c is described below commit 2965c9c7b44fef6ae953e69f07394cf3b5f6d120 Author: Shailesh Panwar <52677315+srpanwar-conflu...@users.noreply.github.com> AuthorDate: Fri May 29 15:18:50 2020 -0700 KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (#8723) Adds documentation and type of ConfigEntry in version 3 of DescribeConfigsResponse Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com> --- .../apache/kafka/clients/admin/ConfigEntry.java | 39 ++++++++- .../clients/admin/DescribeConfigsOptions.java | 15 ++++ .../kafka/clients/admin/KafkaAdminClient.java | 55 +++++++++++-- .../apache/kafka/common/config/AbstractConfig.java | 7 ++ .../common/requests/DescribeConfigsRequest.java | 39 ++++++++- .../common/requests/DescribeConfigsResponse.java | 94 +++++++++++++++++++++- .../common/message/DescribeConfigsRequest.json | 10 ++- .../common/message/DescribeConfigsResponse.json | 8 +- .../org/apache/kafka/clients/admin/ConfigTest.java | 3 +- .../kafka/common/config/AbstractConfigTest.java | 19 +++++ .../kafka/common/requests/RequestResponseTest.java | 26 +++++- .../src/main/scala/kafka/server/AdminManager.scala | 43 +++++++--- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 2 +- 14 files changed, 328 insertions(+), 34 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java index 42cc627..b6d947f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java @@ -37,6 +37,8 @@ public class ConfigEntry { private final boolean isSensitive; private final boolean isReadOnly; private final List<ConfigSynonym> synonyms; + private final ConfigType type; + private final String documentation; /** * Create a configuration entry with the provided values. @@ -65,7 +67,9 @@ public class ConfigEntry { isDefault ? ConfigSource.DEFAULT_CONFIG : ConfigSource.UNKNOWN, isSensitive, isReadOnly, - Collections.<ConfigSynonym>emptyList()); + Collections.<ConfigSynonym>emptyList(), + ConfigType.UNKNOWN, + null); } /** @@ -79,7 +83,7 @@ public class ConfigEntry { * @param synonyms Synonym configs in order of precedence */ ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean isReadOnly, - List<ConfigSynonym> synonyms) { + List<ConfigSynonym> synonyms, ConfigType type, String documentation) { Objects.requireNonNull(name, "name should not be null"); this.name = name; this.value = value; @@ -87,6 +91,8 @@ public class ConfigEntry { this.isSensitive = isSensitive; this.isReadOnly = isReadOnly; this.synonyms = synonyms; + this.type = type; + this.documentation = documentation; } /** @@ -141,6 +147,20 @@ public class ConfigEntry { return synonyms; } + /** + * Return the config data type. + */ + public ConfigType type() { + return type; + } + + /** + * Return the config documentation. + */ + public String documentation() { + return documentation; + } + @Override public boolean equals(Object o) { if (this == o) @@ -183,6 +203,21 @@ public class ConfigEntry { ")"; } + /** + * Data type of configuration entry. + */ + public enum ConfigType { + UNKNOWN, + BOOLEAN, + STRING, + INT, + SHORT, + LONG, + DOUBLE, + LIST, + CLASS, + PASSWORD + } /** * Source of configuration entries. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java index 450cb82..bfb9c18 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java @@ -30,6 +30,7 @@ import java.util.Collection; public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptions> { private boolean includeSynonyms = false; + private boolean includeDocumentation = false; /** * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the @@ -50,6 +51,13 @@ public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptio } /** + * Return true if config documentation should be returned in the response. + */ + public boolean includeDocumentation() { + return includeDocumentation; + } + + /** * Set to true if synonym configs should be returned in the response. */ public DescribeConfigsOptions includeSynonyms(boolean includeSynonyms) { @@ -57,4 +65,11 @@ public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptio return this; } + /** + * Set to true if config documentation should be returned in the response. + */ + public DescribeConfigsOptions includeDocumentation(boolean includeDocumentation) { + this.includeDocumentation = includeDocumentation; + return this; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 52e5c37..a968eed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1472,7 +1472,9 @@ public class KafkaAdminClient extends AdminClient { configSource(DescribeConfigsResponse.ConfigSource.forId(config.configSource())), config.isSensitive(), config.readOnly(), - Collections.emptyList())) + Collections.emptyList(), + null, + null)) .collect(Collectors.toSet())); topicMetadataAndConfig = new TopicMetadataAndConfig(result.numPartitions(), result.replicationFactor(), @@ -1936,7 +1938,8 @@ public class KafkaAdminClient extends AdminClient { @Override DescribeConfigsRequest.Builder createRequest(int timeoutMs) { return new DescribeConfigsRequest.Builder(unifiedRequestResources) - .includeSynonyms(options.includeSynonyms()); + .includeSynonyms(options.includeSynonyms()) + .includeDocumentation(options.includeDocumentation()); } @Override @@ -1960,7 +1963,8 @@ public class KafkaAdminClient extends AdminClient { configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(), configSource(configEntry.source()), configEntry.isSensitive(), configEntry.isReadOnly(), - configSynonyms(configEntry))); + configSynonyms(configEntry), configType(configEntry.type()), + configEntry.documentation())); } future.complete(new Config(configEntries)); } @@ -1983,7 +1987,8 @@ public class KafkaAdminClient extends AdminClient { @Override DescribeConfigsRequest.Builder createRequest(int timeoutMs) { return new DescribeConfigsRequest.Builder(Collections.singleton(resource)) - .includeSynonyms(options.includeSynonyms()); + .includeSynonyms(options.includeSynonyms()) + .includeDocumentation(options.includeDocumentation()); } @Override @@ -2003,7 +2008,7 @@ public class KafkaAdminClient extends AdminClient { for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) { configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(), configSource(configEntry.source()), configEntry.isSensitive(), configEntry.isReadOnly(), - configSynonyms(configEntry))); + configSynonyms(configEntry), configType(configEntry.type()), configEntry.documentation())); } brokerFuture.complete(new Config(configEntries)); } @@ -2056,6 +2061,46 @@ public class KafkaAdminClient extends AdminClient { return configSource; } + private ConfigEntry.ConfigType configType(DescribeConfigsResponse.ConfigType type) { + if (type == null) { + return ConfigEntry.ConfigType.UNKNOWN; + } + + ConfigEntry.ConfigType configType; + switch (type) { + case BOOLEAN: + configType = ConfigEntry.ConfigType.BOOLEAN; + break; + case CLASS: + configType = ConfigEntry.ConfigType.CLASS; + break; + case DOUBLE: + configType = ConfigEntry.ConfigType.DOUBLE; + break; + case INT: + configType = ConfigEntry.ConfigType.INT; + break; + case LIST: + configType = ConfigEntry.ConfigType.LIST; + break; + case LONG: + configType = ConfigEntry.ConfigType.LONG; + break; + case PASSWORD: + configType = ConfigEntry.ConfigType.PASSWORD; + break; + case SHORT: + configType = ConfigEntry.ConfigType.SHORT; + break; + case STRING: + configType = ConfigEntry.ConfigType.STRING; + break; + default: + configType = ConfigEntry.ConfigType.UNKNOWN; + } + return configType; + } + @Override @Deprecated public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) { diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index e28b8c9..968c549 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -201,6 +201,13 @@ public class AbstractConfig { return configKey.type; } + public String documentationOf(String key) { + ConfigDef.ConfigKey configKey = definition.configKeys().get(key); + if (configKey == null) + return null; + return configKey.documentation; + } + public Password getPassword(String key) { return (Password) get(key); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java index 4bcc380..8ea7630 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java @@ -43,6 +43,7 @@ public class DescribeConfigsRequest extends AbstractRequest { private static final String RESOURCE_TYPE_KEY_NAME = "resource_type"; private static final String RESOURCE_NAME_KEY_NAME = "resource_name"; private static final String CONFIG_NAMES_KEY_NAME = "config_names"; + private static final String INCLUDE_DOCUMENTATION = "include_documentation"; private static final Schema DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0 = new Schema( new Field(RESOURCE_TYPE_KEY_NAME, INT8), @@ -61,13 +62,24 @@ public class DescribeConfigsRequest extends AbstractRequest { */ private static final Schema DESCRIBE_CONFIGS_REQUEST_V2 = DESCRIBE_CONFIGS_REQUEST_V1; + private static final Schema DESCRIBE_CONFIGS_REQUEST_V3 = new Schema( + new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to be returned."), + new Field(INCLUDE_SYNONYMS, BOOLEAN), + new Field(INCLUDE_DOCUMENTATION, BOOLEAN)); + public static Schema[] schemaVersions() { - return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0, DESCRIBE_CONFIGS_REQUEST_V1, DESCRIBE_CONFIGS_REQUEST_V2}; + return new Schema[] { + DESCRIBE_CONFIGS_REQUEST_V0, + DESCRIBE_CONFIGS_REQUEST_V1, + DESCRIBE_CONFIGS_REQUEST_V2, + DESCRIBE_CONFIGS_REQUEST_V3 + }; } public static class Builder extends AbstractRequest.Builder<DescribeConfigsRequest> { private final Map<ConfigResource, Collection<String>> resourceToConfigNames; private boolean includeSynonyms; + private boolean includeDocumentation; public Builder(Map<ConfigResource, Collection<String>> resourceToConfigNames) { super(ApiKeys.DESCRIBE_CONFIGS); @@ -79,6 +91,11 @@ public class DescribeConfigsRequest extends AbstractRequest { return this; } + public Builder includeDocumentation(boolean includeDocumentation) { + this.includeDocumentation = includeDocumentation; + return this; + } + public Builder(Collection<ConfigResource> resources) { this(toResourceToConfigNames(resources)); } @@ -92,17 +109,27 @@ public class DescribeConfigsRequest extends AbstractRequest { @Override public DescribeConfigsRequest build(short version) { - return new DescribeConfigsRequest(version, resourceToConfigNames, includeSynonyms); + return new DescribeConfigsRequest( + version, resourceToConfigNames, includeSynonyms, includeDocumentation); } } private final Map<ConfigResource, Collection<String>> resourceToConfigNames; private final boolean includeSynonyms; + private final boolean includeDocumentation; - public DescribeConfigsRequest(short version, Map<ConfigResource, Collection<String>> resourceToConfigNames, boolean includeSynonyms) { + public DescribeConfigsRequest( + short version, Map<ConfigResource, Collection<String>> resourceToConfigNames, + boolean includeSynonyms) { + this(version, resourceToConfigNames, includeSynonyms, false); + } + public DescribeConfigsRequest( + short version, Map<ConfigResource, Collection<String>> resourceToConfigNames, + boolean includeSynonyms, boolean includeDocumentation) { super(ApiKeys.DESCRIBE_CONFIGS, version); this.resourceToConfigNames = Objects.requireNonNull(resourceToConfigNames, "resourceToConfigNames"); this.includeSynonyms = includeSynonyms; + this.includeDocumentation = includeDocumentation; } public DescribeConfigsRequest(Struct struct, short version) { @@ -125,6 +152,7 @@ public class DescribeConfigsRequest extends AbstractRequest { resourceToConfigNames.put(new ConfigResource(resourceType, resourceName), configNames); } this.includeSynonyms = struct.hasField(INCLUDE_SYNONYMS) ? struct.getBoolean(INCLUDE_SYNONYMS) : false; + this.includeDocumentation = struct.hasField(INCLUDE_DOCUMENTATION) ? struct.getBoolean(INCLUDE_DOCUMENTATION) : false; } public Collection<ConfigResource> resources() { @@ -142,6 +170,10 @@ public class DescribeConfigsRequest extends AbstractRequest { return includeSynonyms; } + public boolean includeDocumentation() { + return includeDocumentation; + } + @Override protected Struct toStruct() { Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.requestSchema(version())); @@ -159,6 +191,7 @@ public class DescribeConfigsRequest extends AbstractRequest { } struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0])); struct.setIfExists(INCLUDE_SYNONYMS, includeSynonyms); + struct.setIfExists(INCLUDE_DOCUMENTATION, includeDocumentation); return struct; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java index c24bbe6..bd46bc3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java @@ -56,6 +56,8 @@ public class DescribeConfigsResponse extends AbstractResponse { private static final String IS_SENSITIVE_KEY_NAME = "is_sensitive"; private static final String IS_DEFAULT_KEY_NAME = "is_default"; private static final String READ_ONLY_KEY_NAME = "read_only"; + private static final String CONFIG_TYPE_KEY_NAME = "config_type"; + private static final String CONFIG_DOCUMENTATION_KEY_NAME = "config_documentation"; private static final String CONFIG_SYNONYMS_KEY_NAME = "config_synonyms"; private static final String CONFIG_SOURCE_KEY_NAME = "config_source"; @@ -80,6 +82,16 @@ public class DescribeConfigsResponse extends AbstractResponse { new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN), new Field(CONFIG_SYNONYMS_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1))); + private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTRY_V3 = new Schema( + new Field(CONFIG_NAME_KEY_NAME, STRING), + new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING), + new Field(READ_ONLY_KEY_NAME, BOOLEAN), + new Field(CONFIG_SOURCE_KEY_NAME, INT8), + new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN), + new Field(CONFIG_SYNONYMS_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1)), + new Field(CONFIG_TYPE_KEY_NAME, INT8), + new Field(CONFIG_DOCUMENTATION_KEY_NAME, NULLABLE_STRING)); + private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0 = new Schema( ERROR_CODE, ERROR_MESSAGE, @@ -94,6 +106,13 @@ public class DescribeConfigsResponse extends AbstractResponse { new Field(RESOURCE_NAME_KEY_NAME, STRING), new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTRY_V1))); + private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V3 = new Schema( + ERROR_CODE, + ERROR_MESSAGE, + new Field(RESOURCE_TYPE_KEY_NAME, INT8), + new Field(RESOURCE_NAME_KEY_NAME, STRING), + new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTRY_V3))); + private static final Schema DESCRIBE_CONFIGS_RESPONSE_V0 = new Schema( THROTTLE_TIME_MS, new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0))); @@ -102,13 +121,22 @@ public class DescribeConfigsResponse extends AbstractResponse { THROTTLE_TIME_MS, new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V1))); + private static final Schema DESCRIBE_CONFIGS_RESPONSE_V3 = new Schema( + THROTTLE_TIME_MS, + new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V3))); + /** * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. */ private static final Schema DESCRIBE_CONFIGS_RESPONSE_V2 = DESCRIBE_CONFIGS_RESPONSE_V1; public static Schema[] schemaVersions() { - return new Schema[]{DESCRIBE_CONFIGS_RESPONSE_V0, DESCRIBE_CONFIGS_RESPONSE_V1, DESCRIBE_CONFIGS_RESPONSE_V2}; + return new Schema[]{ + DESCRIBE_CONFIGS_RESPONSE_V0, + DESCRIBE_CONFIGS_RESPONSE_V1, + DESCRIBE_CONFIGS_RESPONSE_V2, + DESCRIBE_CONFIGS_RESPONSE_V3 + }; } public static class Config { @@ -136,9 +164,16 @@ public class DescribeConfigsResponse extends AbstractResponse { private final ConfigSource source; private final boolean readOnly; private final Collection<ConfigSynonym> synonyms; + private final ConfigType type; + private final String documentation; public ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean readOnly, - Collection<ConfigSynonym> synonyms) { + Collection<ConfigSynonym> synonyms) { + this(name, value, source, isSensitive, readOnly, synonyms, ConfigType.UNKNOWN, null); + } + + public ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean readOnly, + Collection<ConfigSynonym> synonyms, ConfigType type, String documentation) { this.name = Objects.requireNonNull(name, "name"); this.value = value; @@ -146,6 +181,8 @@ public class DescribeConfigsResponse extends AbstractResponse { this.isSensitive = isSensitive; this.readOnly = readOnly; this.synonyms = Objects.requireNonNull(synonyms, "synonyms"); + this.type = type; + this.documentation = documentation; } public String name() { @@ -171,6 +208,14 @@ public class DescribeConfigsResponse extends AbstractResponse { public Collection<ConfigSynonym> synonyms() { return synonyms; } + + public ConfigType type() { + return type; + } + + public String documentation() { + return documentation; + } } public enum ConfigSource { @@ -198,6 +243,34 @@ public class DescribeConfigsResponse extends AbstractResponse { } } + public enum ConfigType { + UNKNOWN((byte) 0), + BOOLEAN((byte) 1), + STRING((byte) 2), + INT((byte) 3), + SHORT((byte) 4), + LONG((byte) 5), + DOUBLE((byte) 6), + LIST((byte) 7), + CLASS((byte) 8), + PASSWORD((byte) 9); + + final byte id; + private static final ConfigType[] VALUES = values(); + + ConfigType(byte id) { + this.id = id; + } + + public static ConfigType forId(byte id) { + if (id < 0) + throw new IllegalArgumentException("id should be positive, id: " + id); + if (id >= VALUES.length) + return UNKNOWN; + return VALUES[id]; + } + } + public static class ConfigSynonym { private final String name; private final String value; @@ -248,6 +321,14 @@ public class DescribeConfigsResponse extends AbstractResponse { String configName = configEntriesStruct.getString(CONFIG_NAME_KEY_NAME); String configValue = configEntriesStruct.getString(CONFIG_VALUE_KEY_NAME); boolean isSensitive = configEntriesStruct.getBoolean(IS_SENSITIVE_KEY_NAME); + ConfigType type = ConfigType.UNKNOWN; + if (configEntriesStruct.hasField(CONFIG_TYPE_KEY_NAME)) { + type = ConfigType.forId(configEntriesStruct.getByte(CONFIG_TYPE_KEY_NAME)); + } + String documentation = null; + if (configEntriesStruct.hasField(CONFIG_DOCUMENTATION_KEY_NAME)) { + documentation = configEntriesStruct.getString(CONFIG_DOCUMENTATION_KEY_NAME); + } ConfigSource configSource; if (configEntriesStruct.hasField(CONFIG_SOURCE_KEY_NAME)) configSource = ConfigSource.forId(configEntriesStruct.getByte(CONFIG_SOURCE_KEY_NAME)); @@ -281,9 +362,10 @@ public class DescribeConfigsResponse extends AbstractResponse { ConfigSource source = ConfigSource.forId(synonymStruct.getByte(CONFIG_SOURCE_KEY_NAME)); synonyms.add(new ConfigSynonym(synonymConfigName, synonymConfigValue, source)); } - } else + } else { synonyms = Collections.emptyList(); - configEntries.add(new ConfigEntry(configName, configValue, configSource, isSensitive, readOnly, synonyms)); + } + configEntries.add(new ConfigEntry(configName, configValue, configSource, isSensitive, readOnly, synonyms, type, documentation)); } Config config = new Config(error, configEntries); configs.put(resource, config); @@ -336,6 +418,10 @@ public class DescribeConfigsResponse extends AbstractResponse { configEntriesStruct.setIfExists(CONFIG_SOURCE_KEY_NAME, configEntry.source.id); configEntriesStruct.setIfExists(IS_DEFAULT_KEY_NAME, configEntry.source == ConfigSource.DEFAULT_CONFIG); configEntriesStruct.set(READ_ONLY_KEY_NAME, configEntry.readOnly); + if (configEntriesStruct.hasField(CONFIG_TYPE_KEY_NAME) && configEntry.type != null) { + configEntriesStruct.set(CONFIG_TYPE_KEY_NAME, configEntry.type.id); + } + configEntriesStruct.setIfExists(CONFIG_DOCUMENTATION_KEY_NAME, configEntry.documentation); configEntryStructs.add(configEntriesStruct); if (configEntriesStruct.hasField(CONFIG_SYNONYMS_KEY_NAME)) { List<Struct> configSynonymStructs = new ArrayList<>(configEntry.synonyms.size()); diff --git a/clients/src/main/resources/common/message/DescribeConfigsRequest.json b/clients/src/main/resources/common/message/DescribeConfigsRequest.json index 5d4d13b..1438a36 100644 --- a/clients/src/main/resources/common/message/DescribeConfigsRequest.json +++ b/clients/src/main/resources/common/message/DescribeConfigsRequest.json @@ -17,9 +17,9 @@ "apiKey": 32, "type": "request", "name": "DescribeConfigsRequest", - // Version 1 adds IncludeSynoyms. + // Version 1 adds IncludeSynonyms. // Version 2 is the same as version 1. - "validVersions": "0-2", + "validVersions": "0-3", "flexibleVersions": "none", "fields": [ { "name": "Resources", "type": "[]DescribeConfigsResource", "versions": "0+", @@ -31,7 +31,9 @@ { "name": "ConfigurationKeys", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "about": "The configuration keys to list, or null to list all configuration keys." } ]}, - { "name": "IncludeSynoyms", "type": "bool", "versions": "1+", "default": "false", "ignorable": false, - "about": "True if we should include all synonyms." } + { "name": "IncludeSynonyms", "type": "bool", "versions": "1+", "default": "false", "ignorable": false, + "about": "True if we should include all synonyms." }, + { "name": "IncludeDocumentation", "type": "bool", "versions": "3+", "default": "false", "ignorable": false, + "about": "True if we should include configuration documentation." } ] } diff --git a/clients/src/main/resources/common/message/DescribeConfigsResponse.json b/clients/src/main/resources/common/message/DescribeConfigsResponse.json index 82e44f7..a70c959 100644 --- a/clients/src/main/resources/common/message/DescribeConfigsResponse.json +++ b/clients/src/main/resources/common/message/DescribeConfigsResponse.json @@ -19,7 +19,7 @@ "name": "DescribeConfigsResponse", // Version 1 adds ConfigSource and the synonyms. // Starting in version 2, on quota violation, brokers send out responses before throttling. - "validVersions": "0-2", + "validVersions": "0-3", "flexibleVersions": "none", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", @@ -59,7 +59,11 @@ "about": "The synonym value." }, { "name": "Source", "type": "int8", "versions": "1+", "about": "The synonym source." } - ]} + ]}, + { "name": "ConfigType", "type": "int8", "versions": "3+", "default": "0", "ignorable": true, + "about": "The configuration data type. Type can be one of the following values - BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD" }, + { "name": "Documentation", "type": "string", "versions": "3+", "nullableVersions": "0+", "ignorable": true, + "about": "The configuration documentation." } ]} ]} ] diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java index 1146a35..ed4c0bd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.admin; +import org.apache.kafka.clients.admin.ConfigEntry.ConfigType; import org.junit.Before; import org.junit.Test; @@ -92,6 +93,6 @@ public class ConfigTest { public static ConfigEntry newConfigEntry(String name, String value, ConfigEntry.ConfigSource source, boolean isSensitive, boolean isReadOnly, List<ConfigEntry.ConfigSynonym> synonyms) { - return new ConfigEntry(name, value, source, isSensitive, isReadOnly, synonyms); + return new ConfigEntry(name, value, source, isSensitive, isReadOnly, synonyms, ConfigType.UNKNOWN, null); } } diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java index d395a87..73f83d7 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java @@ -482,6 +482,25 @@ public class AbstractConfigTest { assertEquals(config.originals().get("sasl.truststore.location"), "/usr/vault"); } + @Test + public void testDocumentationOf() { + Properties props = new Properties(); + TestIndirectConfigResolution config = new TestIndirectConfigResolution(props); + + assertEquals( + config.documentationOf(TestIndirectConfigResolution.INDIRECT_CONFIGS), + TestIndirectConfigResolution.INDIRECT_CONFIGS_DOC + ); + } + + @Test + public void testDocumentationOfExpectNull() { + Properties props = new Properties(); + TestIndirectConfigResolution config = new TestIndirectConfigResolution(props); + + assertNull(config.documentationOf("xyz")); + } + private static class TestIndirectConfigResolution extends AbstractConfig { private static final ConfigDef CONFIG; diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index c712c18..c508dbc 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -144,6 +144,7 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.requests.CreateTopicsRequest.Builder; +import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigType; import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; @@ -434,6 +435,9 @@ public class RequestResponseTest { checkResponse(createDescribeConfigsResponse(), 0, false); checkRequest(createDescribeConfigsRequest(1), true); checkRequest(createDescribeConfigsRequestWithConfigEntries(1), false); + checkRequest(createDescribeConfigsRequestWithDocumentation(1), false); + checkRequest(createDescribeConfigsRequestWithDocumentation(2), false); + checkRequest(createDescribeConfigsRequestWithDocumentation(3), false); checkErrorResponse(createDescribeConfigsRequest(1), new UnknownServerException(), true); checkResponse(createDescribeConfigsResponse(), 1, false); checkDescribeConfigsResponseVersions(); @@ -498,7 +502,12 @@ public class RequestResponseTest { assertEquals(expectedEntry.value(), entry.value()); assertEquals(expectedEntry.isReadOnly(), entry.isReadOnly()); assertEquals(expectedEntry.isSensitive(), entry.isSensitive()); - if (version == 1 || (expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG && + if (version < 3) { + assertEquals(ConfigType.UNKNOWN, entry.type()); + } else { + assertEquals(expectedEntry.type(), entry.type()); + } + if (version == 1 || version == 3 || (expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG && expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)) assertEquals(expectedEntry.source(), entry.source()); else @@ -516,6 +525,10 @@ public class RequestResponseTest { DescribeConfigsResponse deserialized1 = (DescribeConfigsResponse) deserialize(response, response.toStruct((short) 1), (short) 1); verifyDescribeConfigsResponse(response, deserialized1, 1); + + DescribeConfigsResponse deserialized3 = (DescribeConfigsResponse) deserialize(response, + response.toStruct((short) 3), (short) 3); + verifyDescribeConfigsResponse(response, deserialized3, 3); } private void checkErrorResponse(AbstractRequest req, Throwable e, boolean checkEqualityAndHashCode) { @@ -1873,6 +1886,12 @@ public class RequestResponseTest { return new DescribeConfigsRequest.Builder(resources).build((short) version); } + private DescribeConfigsRequest createDescribeConfigsRequestWithDocumentation(int version) { + Map<ConfigResource, Collection<String>> resources = new HashMap<>(); + resources.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), asList("foo", "bar")); + return new DescribeConfigsRequest.Builder(resources).includeDocumentation(true).build((short) version); + } + private DescribeConfigsResponse createDescribeConfigsResponse() { Map<ConfigResource, DescribeConfigsResponse.Config> configs = new HashMap<>(); List<DescribeConfigsResponse.ConfigSynonym> synonyms = emptyList(); @@ -1880,7 +1899,10 @@ public class RequestResponseTest { new DescribeConfigsResponse.ConfigEntry("config_name", "config_value", DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG, true, false, synonyms), new DescribeConfigsResponse.ConfigEntry("another_name", "another value", - DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, true, synonyms) + DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, true, synonyms), + new DescribeConfigsResponse.ConfigEntry("yet_another_name", "yet another value", + DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, true, synonyms, + ConfigType.BOOLEAN, "some description") ); configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new DescribeConfigsResponse.Config( ApiError.NONE, configEntries)); diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 3330ee9..183a5d3 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -164,7 +164,7 @@ class AdminManager(val config: KafkaConfig, // For responses with DescribeConfigs permission, populate metadata and configs includeConfigsAndMetatadata.get(topic.name).foreach { result => val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), configs) - val createEntry = createTopicConfigEntry(logConfig, configs, includeSynonyms = false)(_, _) + val createEntry = createTopicConfigEntry(logConfig, configs, includeSynonyms = false, includeDocumentation = false)(_, _) val topicConfigs = logConfig.values.asScala.map { case (k, v) => val entry = createEntry(k, v) val source = ConfigSource.values.indices.map(_.toByte) @@ -347,7 +347,7 @@ class AdminManager(val config: KafkaConfig, } } - def describeConfigs(resourceToConfigNames: Map[ConfigResource, Option[Set[String]]], includeSynonyms: Boolean): Map[ConfigResource, DescribeConfigsResponse.Config] = { + def describeConfigs(resourceToConfigNames: Map[ConfigResource, Option[Set[String]]], includeSynonyms: Boolean, includeDocumentation: Boolean): Map[ConfigResource, DescribeConfigsResponse.Config] = { resourceToConfigNames.map { case (resource, configNames) => def allConfigs(config: AbstractConfig) = { @@ -374,7 +374,7 @@ class AdminManager(val config: KafkaConfig, // Consider optimizing this by caching the configs or retrieving them from the `Log` when possible val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic) val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps) - createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms)) + createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms, includeDocumentation)) } else { new DescribeConfigsResponse.Config(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null), Collections.emptyList[DescribeConfigsResponse.ConfigEntry]) } @@ -382,10 +382,10 @@ class AdminManager(val config: KafkaConfig, case ConfigResource.Type.BROKER => if (resource.name == null || resource.name.isEmpty) createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs, - createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms)) + createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms, includeDocumentation)) else if (resourceNameToBrokerId(resource.name) == config.brokerId) createResponseConfig(allConfigs(config), - createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms)) + createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms, includeDocumentation)) else throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received ${resource.name}") @@ -658,6 +658,27 @@ class AdminManager(val config: KafkaConfig, DynamicBrokerConfig.brokerConfigSynonyms(name, matchListenerOverride = true) } + private def brokerDocumentation(name: String): String = { + config.documentationOf(name) + } + + private def configResponseType(configType: Option[ConfigDef.Type]): DescribeConfigsResponse.ConfigType = { + if (configType.isEmpty) + DescribeConfigsResponse.ConfigType.UNKNOWN + else configType.get match { + case ConfigDef.Type.BOOLEAN => DescribeConfigsResponse.ConfigType.BOOLEAN + case ConfigDef.Type.STRING => DescribeConfigsResponse.ConfigType.STRING + case ConfigDef.Type.INT => DescribeConfigsResponse.ConfigType.INT + case ConfigDef.Type.SHORT => DescribeConfigsResponse.ConfigType.SHORT + case ConfigDef.Type.LONG => DescribeConfigsResponse.ConfigType.LONG + case ConfigDef.Type.DOUBLE => DescribeConfigsResponse.ConfigType.DOUBLE + case ConfigDef.Type.LIST => DescribeConfigsResponse.ConfigType.LIST + case ConfigDef.Type.CLASS => DescribeConfigsResponse.ConfigType.CLASS + case ConfigDef.Type.PASSWORD => DescribeConfigsResponse.ConfigType.PASSWORD + case _ => DescribeConfigsResponse.ConfigType.UNKNOWN + } + } + private def configSynonyms(name: String, synonyms: List[String], isSensitive: Boolean): List[DescribeConfigsResponse.ConfigSynonym] = { val dynamicConfig = config.dynamicConfig val allSynonyms = mutable.Buffer[DescribeConfigsResponse.ConfigSynonym]() @@ -676,7 +697,7 @@ class AdminManager(val config: KafkaConfig, allSynonyms.dropWhile(s => s.name != name).toList // e.g. drop listener overrides when describing base config } - private def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean) + private def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean) (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = { val configEntryType = LogConfig.configType(name) val isSensitive = KafkaConfig.maybeSensitive(configEntryType) @@ -692,10 +713,12 @@ class AdminManager(val config: KafkaConfig, } val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source val synonyms = if (!includeSynonyms) List.empty else allSynonyms - new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, false, synonyms.asJava) + val dataType = configResponseType(configEntryType) + val configDocumentation = if (includeDocumentation) brokerDocumentation(name) else null + new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, false, synonyms.asJava, dataType, configDocumentation) } - private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean) + private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean, includeDocumentation: Boolean) (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = { val allNames = brokerSynonyms(name) val configEntryType = KafkaConfig.configType(name) @@ -711,7 +734,9 @@ class AdminManager(val config: KafkaConfig, val synonyms = if (!includeSynonyms) List.empty else allSynonyms val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source val readOnly = !DynamicBrokerConfig.AllDynamicConfigs.contains(name) - new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, readOnly, synonyms.asJava) + val dataType = configResponseType(configEntryType) + val configDocumentation = if (includeDocumentation) brokerDocumentation(name) else null + new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, readOnly, synonyms.asJava, dataType, configDocumentation) } private def sanitizeEntityName(entityName: String): String = diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 28e20cd..86649c1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2579,7 +2579,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val authorizedConfigs = adminManager.describeConfigs(authorizedResources.map { resource => resource -> Option(describeConfigsRequest.configNames(resource)).map(_.asScala.toSet) - }.toMap, describeConfigsRequest.includeSynonyms) + }.toMap, describeConfigsRequest.includeSynonyms, describeConfigsRequest.includeDocumentation) val unauthorizedConfigs = unauthorizedResources.map { resource => val error = configsAuthorizationApiError(resource) resource -> new DescribeConfigsResponse.Config(error, util.Collections.emptyList[DescribeConfigsResponse.ConfigEntry]) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 0b9c4a4..9da7319 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -257,7 +257,7 @@ class KafkaApisTest { val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName) val config = new DescribeConfigsResponse.Config(ApiError.NONE, Collections.emptyList[DescribeConfigsResponse.ConfigEntry]) - EasyMock.expect(adminManager.describeConfigs(anyObject(), EasyMock.eq(true))) + EasyMock.expect(adminManager.describeConfigs(anyObject(), EasyMock.eq(true), EasyMock.eq(false))) .andReturn(Map(configResource -> config)) EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,