KAFKA-2687: Add support for ListGroups and DescribeGroup APIs Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang, Jun Rao Closes #388 from hachikuji/K2687 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/596c203a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/596c203a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/596c203a Branch: refs/heads/trunk Commit: 596c203af1f33360c04f4be7c466310d11343f78 Parents: f413143 Author: Jason Gustafson <ja...@confluent.io> Authored: Tue Nov 3 14:46:04 2015 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Nov 3 14:46:04 2015 -0800 ---------------------------------------------------------------------- .../kafka/clients/consumer/ConsumerConfig.java | 34 +-- .../consumer/internals/AbstractCoordinator.java | 20 +- .../consumer/internals/ConsumerCoordinator.java | 2 +- .../consumer/internals/ConsumerProtocol.java | 2 + .../kafka/clients/producer/ProducerConfig.java | 70 +++--- .../apache/kafka/common/config/ConfigDef.java | 17 ++ .../apache/kafka/common/config/SaslConfigs.java | 8 + .../apache/kafka/common/config/SslConfigs.java | 17 ++ .../apache/kafka/common/protocol/ApiKeys.java | 6 +- .../apache/kafka/common/protocol/Errors.java | 3 +- .../apache/kafka/common/protocol/Protocol.java | 82 ++++++- .../kafka/common/protocol/types/Struct.java | 8 + .../kafka/common/requests/AbstractRequest.java | 8 +- .../common/requests/DescribeGroupsRequest.java | 68 ++++++ .../common/requests/DescribeGroupsResponse.java | 224 +++++++++++++++++ .../requests/GroupCoordinatorRequest.java | 65 +++++ .../requests/GroupCoordinatorResponse.java | 70 ++++++ .../common/requests/GroupMetadataRequest.java | 65 ----- .../common/requests/GroupMetadataResponse.java | 70 ------ .../common/requests/ListGroupsRequest.java | 57 +++++ .../common/requests/ListGroupsResponse.java | 107 ++++++++ .../internals/ConsumerCoordinatorTest.java | 4 +- .../common/requests/RequestResponseTest.java | 45 +++- .../distributed/WorkerCoordinatorTest.java | 4 +- .../main/scala/kafka/admin/AdminClient.scala | 242 +++++++++++++++++++ .../kafka/api/GroupCoordinatorRequest.scala | 80 ++++++ .../kafka/api/GroupCoordinatorResponse.scala | 58 +++++ .../scala/kafka/api/GroupMetadataRequest.scala | 80 ------ .../scala/kafka/api/GroupMetadataResponse.scala | 58 ----- core/src/main/scala/kafka/api/RequestKeys.scala | 10 +- .../main/scala/kafka/client/ClientUtils.scala | 4 +- .../main/scala/kafka/common/ErrorMapping.scala | 1 + .../scala/kafka/consumer/SimpleConsumer.scala | 4 +- .../kafka/coordinator/GroupCoordinator.scala | 104 ++++++-- .../scala/kafka/coordinator/GroupMetadata.scala | 36 ++- .../coordinator/GroupMetadataManager.scala | 38 +-- .../kafka/coordinator/MemberMetadata.scala | 24 +- .../javaapi/GroupCoordinatorResponse.scala | 47 ++++ .../kafka/javaapi/GroupMetadataResponse.scala | 47 ---- .../scala/kafka/network/RequestChannel.scala | 3 +- .../src/main/scala/kafka/server/KafkaApis.scala | 90 +++++-- .../integration/kafka/api/AdminClientTest.scala | 114 +++++++++ .../kafka/api/AuthorizerIntegrationTest.scala | 8 +- .../scala/other/kafka/TestOffsetManager.scala | 4 +- .../api/RequestResponseSerializationTest.scala | 10 +- .../GroupCoordinatorResponseTest.scala | 144 +++++++++-- .../kafka/coordinator/GroupMetadataTest.scala | 23 +- .../kafka/coordinator/MemberMetadataTest.scala | 31 +-- .../unit/kafka/server/OffsetCommitTest.scala | 4 +- 49 files changed, 1765 insertions(+), 555 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index b366efd..4131352 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -17,8 +17,6 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.Deserializer; import java.util.HashMap; @@ -286,27 +284,6 @@ public class ConsumerConfig extends AbstractConfig { Type.CLASS, Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC) - .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC) - .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC) - .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) - .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false) - .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false) - .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC) - .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC) - .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false) - .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false) - .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false) - .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC) - .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false) - .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false) - .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) - .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) - .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false) - .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false) - .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC) - .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC) - .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC) - .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC) .define(REQUEST_TIMEOUT_MS_CONFIG, Type.INT, 40 * 1000, @@ -318,7 +295,16 @@ public class ConsumerConfig extends AbstractConfig { Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, - CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC); + CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) + + // security support + .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, + Type.STRING, + CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, + Importance.MEDIUM, + CommonClientConfigs.SECURITY_PROTOCOL_DOC) + .withClientSslSupport() + .withClientSaslSupport(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index d8f3c25..9cf825c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -30,8 +30,8 @@ import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.GroupMetadataRequest; -import org.apache.kafka.common.requests.GroupMetadataResponse; +import org.apache.kafka.common.requests.GroupCoordinatorRequest; +import org.apache.kafka.common.requests.GroupCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatRequest; import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupRequest; @@ -450,8 +450,8 @@ public abstract class AbstractCoordinator { } else { // create a group metadata request log.debug("Issuing group metadata request to broker {}", node.id()); - GroupMetadataRequest metadataRequest = new GroupMetadataRequest(this.groupId); - return client.send(node, ApiKeys.GROUP_METADATA, metadataRequest) + GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId); + return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) .compose(new RequestFutureAdapter<ClientResponse, Void>() { @Override public void onSuccess(ClientResponse response, RequestFuture<Void> future) { @@ -472,14 +472,14 @@ public abstract class AbstractCoordinator { // We already found the coordinator, so ignore the request future.complete(null); } else { - GroupMetadataResponse groupMetadataResponse = new GroupMetadataResponse(resp.responseBody()); + GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody()); // use MAX_VALUE - node.id as the coordinator id to mimic separate connections // for the coordinator in the underlying network client layer // TODO: this needs to be better handled in KAFKA-1935 - if (groupMetadataResponse.errorCode() == Errors.NONE.code()) { - this.coordinator = new Node(Integer.MAX_VALUE - groupMetadataResponse.node().id(), - groupMetadataResponse.node().host(), - groupMetadataResponse.node().port()); + if (groupCoordinatorResponse.errorCode() == Errors.NONE.code()) { + this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(), + groupCoordinatorResponse.node().host(), + groupCoordinatorResponse.node().port()); client.tryConnect(coordinator); @@ -488,7 +488,7 @@ public abstract class AbstractCoordinator { heartbeatTask.reset(); future.complete(null); } else { - future.raise(Errors.forCode(groupMetadataResponse.errorCode())); + future.raise(Errors.forCode(groupCoordinatorResponse.errorCode())); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 97d25c3..5b5d6ff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -119,7 +119,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl @Override public String protocolType() { - return "consumer"; + return ConsumerProtocol.PROTOCOL_TYPE; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java index 0020993..4728a50 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java @@ -54,6 +54,8 @@ import java.util.Map; */ public class ConsumerProtocol { + public static final String PROTOCOL_TYPE = "consumer"; + public static final String VERSION_KEY_NAME = "version"; public static final String TOPICS_KEY_NAME = "topics"; public static final String TOPIC_KEY_NAME = "topic"; http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index e9d9fad..3eaea09 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -12,23 +12,22 @@ */ package org.apache.kafka.clients.producer; -import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; -import static org.apache.kafka.common.config.ConfigDef.Range.between; -import static org.apache.kafka.common.config.ConfigDef.ValidString.in; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.serialization.Serializer; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Range.between; +import static org.apache.kafka.common.config.ConfigDef.ValidString.in; + /** * Configuration for the Kafka Producer. Documentation for these configurations can be found in the <a * href="http://kafka.apache.org/documentation.html#newproducerconfigs">Kafka documentation</a> @@ -262,32 +261,33 @@ public class ProducerConfig extends AbstractConfig { atLeast(1), Importance.LOW, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) - .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) - .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) - .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC) - .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC) - .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) - .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false) - .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false) - .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC) - .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC) - .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false) - .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false) - .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false) - .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC) - .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false) - .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false) - .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) - .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) - .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false) - .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false) - .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC) - .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC) - .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC) - .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC) + .define(KEY_SERIALIZER_CLASS_CONFIG, + Type.CLASS, + Importance.HIGH, + KEY_SERIALIZER_CLASS_DOC) + .define(VALUE_SERIALIZER_CLASS_CONFIG, + Type.CLASS, + Importance.HIGH, + VALUE_SERIALIZER_CLASS_DOC) /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ - .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) - .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC); + .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, + Type.LONG, + 9 * 60 * 1000, + Importance.MEDIUM, + CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) + .define(PARTITIONER_CLASS_CONFIG, + Type.CLASS, + DefaultPartitioner.class.getName(), + Importance.MEDIUM, PARTITIONER_CLASS_DOC) + + // security support + .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, + Type.STRING, + CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, + Importance.MEDIUM, + CommonClientConfigs.SECURITY_PROTOCOL_DOC) + .withClientSslSupport() + .withClientSaslSupport(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 2e820dd..f01ed28 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -151,6 +151,23 @@ public class ConfigDef { return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, required); } + /** + * Add standard SSL client configuration options. + * @return this + */ + public ConfigDef withClientSslSupport() { + SslConfigs.addClientSslSupport(this); + return this; + } + + /** + * Add standard SASL client configuration options. + * @return this + */ + public ConfigDef withClientSaslSupport() { + SaslConfigs.addClientSaslSupport(this); + return this; + } /** * Parse and validate configs against this configuration definition. The input is a map of configs. It is expected http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java index 657c6d3..0046868 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java @@ -49,4 +49,12 @@ public class SaslConfigs { "By default, principal names of the form <username>/<hostname>@<REALM> are mapped to <username>."; public static final List<String> DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList("DEFAULT"); + public static void addClientSaslSupport(ConfigDef config) { + config.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false) + .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC) + .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC) + .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC) + .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java index 60e1eb3..8f93301 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java @@ -96,4 +96,21 @@ public class SslConfigs { + " unlike requested , if this option is set client can choose not to provide authentication information about itself" + " <li><code>ssl.client.auth=none</code> This means client authentication is not needed."; + public static void addClientSslSupport(ConfigDef config) { + config.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC) + .define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) + .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false) + .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false) + .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC) + .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC) + .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false) + .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false) + .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false) + .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC) + .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false) + .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false) + .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) + .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) + .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index af7b266..5bd3c96 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -30,11 +30,13 @@ public enum ApiKeys { CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"), OFFSET_COMMIT(8, "OffsetCommit"), OFFSET_FETCH(9, "OffsetFetch"), - GROUP_METADATA(10, "GroupMetadata"), + GROUP_COORDINATOR(10, "GroupCoordinator"), JOIN_GROUP(11, "JoinGroup"), HEARTBEAT(12, "Heartbeat"), LEAVE_GROUP(13, "LeaveGroup"), - SYNC_GROUP(14, "SyncGroup"); + SYNC_GROUP(14, "SyncGroup"), + DESCRIBE_GROUPS(15, "DescribeGroups"), + LIST_GROUPS(16, "ListGroups"); private static ApiKeys[] codeToType; public static final int MAX_API_KEY; http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 2c9cb20..d4eb1f9 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -86,7 +86,8 @@ public enum Errors { new ApiException("The session timeout is not within an acceptable range.")), INVALID_COMMIT_OFFSET_SIZE(28, new ApiException("The committing offset data size is not valid")), - AUTHORIZATION_FAILED(29, new ApiException("Request is not authorized.")), + AUTHORIZATION_FAILED(29, + new ApiException("Request is not authorized.")), REBALANCE_IN_PROGRESS(30, new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed.")); http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 36094b0..00560db 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -388,18 +388,71 @@ public class Protocol { public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1}; public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1}; - /* Consumer metadata api */ - public static final Schema GROUP_METADATA_REQUEST_V0 = new Schema(new Field("group_id", - STRING, - "The unique group id.")); + /* List groups api */ + public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema(); - public static final Schema GROUP_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16), - new Field("coordinator", - BROKER, - "Host and port information for the coordinator for a consumer group.")); + public static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(new Field("group_id", STRING), + new Field("protocol_type", STRING)); + public static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(new Field("error_code", INT16), + new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); - public static final Schema[] GROUP_METADATA_REQUEST = new Schema[] {GROUP_METADATA_REQUEST_V0}; - public static final Schema[] GROUP_METADATA_RESPONSE = new Schema[] {GROUP_METADATA_RESPONSE_V0}; + public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0}; + public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0}; + + /* Describe group api */ + public static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(new Field("group_ids", + new ArrayOf(STRING), + "List of groupIds to request metadata for (an empty groupId array will return empty group metadata).")); + + public static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id", + STRING, + "The memberId assigned by the coordinator"), + new Field("client_id", + STRING, + "The client id used in the member's latest join group request"), + new Field("client_host", + STRING, + "The client host used in the request session corresponding to the member's join group."), + new Field("member_metadata", + BYTES, + "The metadata corresponding to the current group protocol in use (will only be present if the group is stable)."), + new Field("member_assignment", + BYTES, + "The current assignment provided by the group leader (will only be present if the group is stable).")); + + public static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema(new Field("error_code", INT16), + new Field("group_id", + STRING), + new Field("state", + STRING, + "The current state of the group (one of: Dead, Stable, AwaitingSync, or PreparingRebalance, or empty if there is no active group)"), + new Field("protocol_type", + STRING, + "The current group protocol type (will be empty if the there is no active group)"), + new Field("protocol", + STRING, + "The current group protocol (only provided if the group is Stable)"), + new Field("members", + new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0), + "Current group members (only provided if the group is not Dead)")); + + public static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0))); + + public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0}; + public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0}; + + /* Group coordinator api */ + public static final Schema GROUP_COORDINATOR_REQUEST_V0 = new Schema(new Field("group_id", + STRING, + "The unique group id.")); + + public static final Schema GROUP_COORDINATOR_RESPONSE_V0 = new Schema(new Field("error_code", INT16), + new Field("coordinator", + BROKER, + "Host and port information for the coordinator for a consumer group.")); + + public static final Schema[] GROUP_COORDINATOR_REQUEST = new Schema[] {GROUP_COORDINATOR_REQUEST_V0}; + public static final Schema[] GROUP_COORDINATOR_RESPONSE = new Schema[] {GROUP_COORDINATOR_RESPONSE_V0}; /* Controlled shutdown api */ public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id", @@ -616,12 +669,13 @@ public class Protocol { REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_REQUEST; REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST; REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST; - REQUESTS[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_REQUEST; + REQUESTS[ApiKeys.GROUP_COORDINATOR.id] = GROUP_COORDINATOR_REQUEST; REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST; REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST; REQUESTS[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_REQUEST; REQUESTS[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_REQUEST; - + REQUESTS[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_REQUEST; + REQUESTS[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_REQUEST; RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE; RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE; @@ -633,11 +687,13 @@ public class Protocol { RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_RESPONSE; RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE; RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE; - RESPONSES[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_RESPONSE; + RESPONSES[ApiKeys.GROUP_COORDINATOR.id] = GROUP_COORDINATOR_RESPONSE; RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE; RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE; RESPONSES[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_RESPONSE; RESPONSES[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_RESPONSE; + RESPONSES[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_RESPONSE; + RESPONSES[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_RESPONSE; /* set the maximum version of each api */ for (ApiKeys api : ApiKeys.values()) http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index ef2525e..54c3deb 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -98,6 +98,14 @@ public class Struct { return (Struct) get(name); } + public Byte getByte(Field field) { + return (Byte) get(field); + } + + public byte getByte(String name) { + return (Byte) get(name); + } + public Short getShort(Field field) { return (Short) get(field); } http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 03e77a5..8dfa3f6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -49,8 +49,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse { return OffsetCommitRequest.parse(buffer, versionId); case OFFSET_FETCH: return OffsetFetchRequest.parse(buffer, versionId); - case GROUP_METADATA: - return GroupMetadataRequest.parse(buffer, versionId); + case GROUP_COORDINATOR: + return GroupCoordinatorRequest.parse(buffer, versionId); case JOIN_GROUP: return JoinGroupRequest.parse(buffer, versionId); case HEARTBEAT: @@ -67,6 +67,10 @@ public abstract class AbstractRequest extends AbstractRequestResponse { return UpdateMetadataRequest.parse(buffer, versionId); case LEADER_AND_ISR: return LeaderAndIsrRequest.parse(buffer, versionId); + case DESCRIBE_GROUPS: + return DescribeGroupsRequest.parse(buffer, versionId); + case LIST_GROUPS: + return ListGroupsRequest.parse(buffer, versionId); default: return null; } http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java new file mode 100644 index 0000000..a545cca --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class DescribeGroupsRequest extends AbstractRequest { + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.DESCRIBE_GROUPS.id); + private static final String GROUP_IDS_KEY_NAME = "group_ids"; + + private final List<String> groupIds; + + public DescribeGroupsRequest(List<String> groupIds) { + super(new Struct(CURRENT_SCHEMA)); + struct.set(GROUP_IDS_KEY_NAME, groupIds.toArray()); + this.groupIds = groupIds; + } + + public DescribeGroupsRequest(Struct struct) { + super(struct); + this.groupIds = new ArrayList<>(); + for (Object groupId : struct.getArray(GROUP_IDS_KEY_NAME)) + this.groupIds.add((String) groupId); + } + + public List<String> groupIds() { + return groupIds; + } + + @Override + public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { + switch (versionId) { + case 0: + return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds); + + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.DESCRIBE_GROUPS.id))); + } + } + + public static DescribeGroupsRequest parse(ByteBuffer buffer, int versionId) { + return new DescribeGroupsRequest(ProtoUtils.parseRequest(ApiKeys.DESCRIBE_GROUPS.id, versionId, buffer)); + } + + public static DescribeGroupsRequest parse(ByteBuffer buffer) { + return new DescribeGroupsRequest((Struct) CURRENT_SCHEMA.read(buffer)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java new file mode 100644 index 0000000..c71e7d2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DescribeGroupsResponse extends AbstractRequestResponse { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.DESCRIBE_GROUPS.id); + + private static final String GROUPS_KEY_NAME = "groups"; + + private static final String ERROR_CODE_KEY_NAME = "error_code"; + private static final String GROUP_ID_KEY_NAME = "group_id"; + private static final String GROUP_STATE_KEY_NAME = "state"; + private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type"; + private static final String PROTOCOL_KEY_NAME = "protocol"; + + private static final String MEMBERS_KEY_NAME = "members"; + private static final String MEMBER_ID_KEY_NAME = "member_id"; + private static final String CLIENT_ID_KEY_NAME = "client_id"; + private static final String CLIENT_HOST_KEY_NAME = "client_host"; + private static final String MEMBER_METADATA_KEY_NAME = "member_metadata"; + private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment"; + + public static final String UNKNOWN_STATE = ""; + public static final String UNKNOWN_PROTOCOL_TYPE = ""; + public static final String UNKNOWN_PROTOCOL = ""; + + /** + * Possible per-group error codes: + * + * GROUP_LOAD_IN_PROGRESS (14) + * GROUP_COORDINATOR_NOT_AVAILABLE (15) + * NOT_COORDINATOR_FOR_GROUP (16) + * AUTHORIZATION_FAILED (29) + */ + + private final Map<String, GroupMetadata> groups; + + public DescribeGroupsResponse(Map<String, GroupMetadata> groups) { + super(new Struct(CURRENT_SCHEMA)); + + List<Struct> groupStructs = new ArrayList<>(); + for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) { + Struct groupStruct = struct.instance(GROUPS_KEY_NAME); + GroupMetadata group = groupEntry.getValue(); + groupStruct.set(GROUP_ID_KEY_NAME, groupEntry.getKey()); + groupStruct.set(ERROR_CODE_KEY_NAME, group.errorCode); + groupStruct.set(GROUP_STATE_KEY_NAME, group.state); + groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType); + groupStruct.set(PROTOCOL_KEY_NAME, group.protocol); + List<Struct> membersList = new ArrayList<>(); + for (GroupMember member : group.members) { + Struct memberStruct = groupStruct.instance(MEMBERS_KEY_NAME); + memberStruct.set(MEMBER_ID_KEY_NAME, member.memberId); + memberStruct.set(CLIENT_ID_KEY_NAME, member.clientId); + memberStruct.set(CLIENT_HOST_KEY_NAME, member.clientHost); + memberStruct.set(MEMBER_METADATA_KEY_NAME, member.memberMetadata); + memberStruct.set(MEMBER_ASSIGNMENT_KEY_NAME, member.memberAssignment); + membersList.add(memberStruct); + } + groupStruct.set(MEMBERS_KEY_NAME, membersList.toArray()); + groupStructs.add(groupStruct); + } + struct.set(GROUPS_KEY_NAME, groupStructs.toArray()); + this.groups = groups; + } + + public DescribeGroupsResponse(Struct struct) { + super(struct); + this.groups = new HashMap<>(); + for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) { + Struct groupStruct = (Struct) groupObj; + + String groupId = groupStruct.getString(GROUP_ID_KEY_NAME); + short errorCode = groupStruct.getShort(ERROR_CODE_KEY_NAME); + String state = groupStruct.getString(GROUP_STATE_KEY_NAME); + String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME); + String protocol = groupStruct.getString(PROTOCOL_KEY_NAME); + + List<GroupMember> members = new ArrayList<>(); + for (Object memberObj : groupStruct.getArray(MEMBERS_KEY_NAME)) { + Struct memberStruct = (Struct) memberObj; + String memberId = memberStruct.getString(MEMBER_ID_KEY_NAME); + String clientId = memberStruct.getString(CLIENT_ID_KEY_NAME); + String clientHost = memberStruct.getString(CLIENT_HOST_KEY_NAME); + ByteBuffer memberMetadata = memberStruct.getBytes(MEMBER_METADATA_KEY_NAME); + ByteBuffer memberAssignment = memberStruct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME); + members.add(new GroupMember(memberId, clientId, clientHost, + memberMetadata, memberAssignment)); + } + this.groups.put(groupId, new GroupMetadata(errorCode, state, protocolType, protocol, members)); + } + } + + public Map<String, GroupMetadata> groups() { + return groups; + } + + + public static class GroupMetadata { + private final short errorCode; + private final String state; + private final String protocolType; + private final String protocol; + private final List<GroupMember> members; + + public GroupMetadata(short errorCode, + String state, + String protocolType, + String protocol, + List<GroupMember> members) { + this.errorCode = errorCode; + this.state = state; + this.protocolType = protocolType; + this.protocol = protocol; + this.members = members; + } + + public short errorCode() { + return errorCode; + } + + public String state() { + return state; + } + + public String protocolType() { + return protocolType; + } + + public String protocol() { + return protocol; + } + + public List<GroupMember> members() { + return members; + } + + public static GroupMetadata forError(Errors error) { + return new DescribeGroupsResponse.GroupMetadata( + error.code(), + DescribeGroupsResponse.UNKNOWN_STATE, + DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE, + DescribeGroupsResponse.UNKNOWN_PROTOCOL, + Collections.<DescribeGroupsResponse.GroupMember>emptyList()); + } + } + + public static class GroupMember { + private final String memberId; + private final String clientId; + private final String clientHost; + private final ByteBuffer memberMetadata; + private final ByteBuffer memberAssignment; + + public GroupMember(String memberId, + String clientId, + String clientHost, + ByteBuffer memberMetadata, + ByteBuffer memberAssignment) { + this.memberId = memberId; + this.clientId = clientId; + this.clientHost = clientHost; + this.memberMetadata = memberMetadata; + this.memberAssignment = memberAssignment; + } + + public String memberId() { + return memberId; + } + + public String clientId() { + return clientId; + } + + public String clientHost() { + return clientHost; + } + + public ByteBuffer memberMetadata() { + return memberMetadata; + } + + public ByteBuffer memberAssignment() { + return memberAssignment; + } + } + + public static DescribeGroupsResponse parse(ByteBuffer buffer) { + return new DescribeGroupsResponse((Struct) CURRENT_SCHEMA.read(buffer)); + } + + public static DescribeGroupsResponse fromError(Errors error, List<String> groupIds) { + GroupMetadata errorMetadata = GroupMetadata.forError(error); + Map<String, GroupMetadata> groups = new HashMap<>(); + for (String groupId : groupIds) + groups.put(groupId, errorMetadata); + return new DescribeGroupsResponse(groups); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java new file mode 100644 index 0000000..8c56e7f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class GroupCoordinatorRequest extends AbstractRequest { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.GROUP_COORDINATOR.id); + private static final String GROUP_ID_KEY_NAME = "group_id"; + + private final String groupId; + + public GroupCoordinatorRequest(String groupId) { + super(new Struct(CURRENT_SCHEMA)); + + struct.set(GROUP_ID_KEY_NAME, groupId); + this.groupId = groupId; + } + + public GroupCoordinatorRequest(Struct struct) { + super(struct); + groupId = struct.getString(GROUP_ID_KEY_NAME); + } + + @Override + public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { + switch (versionId) { + case 0: + return new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode()); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.GROUP_COORDINATOR.id))); + } + } + + public String groupId() { + return groupId; + } + + public static GroupCoordinatorRequest parse(ByteBuffer buffer, int versionId) { + return new GroupCoordinatorRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_COORDINATOR.id, versionId, buffer)); + } + + public static GroupCoordinatorRequest parse(ByteBuffer buffer) { + return new GroupCoordinatorRequest((Struct) CURRENT_SCHEMA.read(buffer)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java new file mode 100644 index 0000000..c28de70 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class GroupCoordinatorResponse extends AbstractRequestResponse { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_COORDINATOR.id); + private static final String ERROR_CODE_KEY_NAME = "error_code"; + private static final String COORDINATOR_KEY_NAME = "coordinator"; + + // coordinator level field names + private static final String NODE_ID_KEY_NAME = "node_id"; + private static final String HOST_KEY_NAME = "host"; + private static final String PORT_KEY_NAME = "port"; + + private final short errorCode; + private final Node node; + + public GroupCoordinatorResponse(short errorCode, Node node) { + super(new Struct(CURRENT_SCHEMA)); + struct.set(ERROR_CODE_KEY_NAME, errorCode); + Struct coordinator = struct.instance(COORDINATOR_KEY_NAME); + coordinator.set(NODE_ID_KEY_NAME, node.id()); + coordinator.set(HOST_KEY_NAME, node.host()); + coordinator.set(PORT_KEY_NAME, node.port()); + struct.set(COORDINATOR_KEY_NAME, coordinator); + this.errorCode = errorCode; + this.node = node; + } + + public GroupCoordinatorResponse(Struct struct) { + super(struct); + errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME); + int nodeId = broker.getInt(NODE_ID_KEY_NAME); + String host = broker.getString(HOST_KEY_NAME); + int port = broker.getInt(PORT_KEY_NAME); + node = new Node(nodeId, host, port); + } + + public short errorCode() { + return errorCode; + } + + public Node node() { + return node; + } + + public static GroupCoordinatorResponse parse(ByteBuffer buffer) { + return new GroupCoordinatorResponse((Struct) CURRENT_SCHEMA.read(buffer)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java deleted file mode 100644 index fd54c5a..0000000 --- a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.common.requests; - -import org.apache.kafka.common.Node; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; - -import java.nio.ByteBuffer; - -public class GroupMetadataRequest extends AbstractRequest { - - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.GROUP_METADATA.id); - private static final String GROUP_ID_KEY_NAME = "group_id"; - - private final String groupId; - - public GroupMetadataRequest(String groupId) { - super(new Struct(CURRENT_SCHEMA)); - - struct.set(GROUP_ID_KEY_NAME, groupId); - this.groupId = groupId; - } - - public GroupMetadataRequest(Struct struct) { - super(struct); - groupId = struct.getString(GROUP_ID_KEY_NAME); - } - - @Override - public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { - switch (versionId) { - case 0: - return new GroupMetadataResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode()); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.GROUP_METADATA.id))); - } - } - - public String groupId() { - return groupId; - } - - public static GroupMetadataRequest parse(ByteBuffer buffer, int versionId) { - return new GroupMetadataRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_METADATA.id, versionId, buffer)); - } - - public static GroupMetadataRequest parse(ByteBuffer buffer) { - return new GroupMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer)); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java deleted file mode 100644 index a5ef478..0000000 --- a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.common.requests; - -import org.apache.kafka.common.Node; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; - -import java.nio.ByteBuffer; - -public class GroupMetadataResponse extends AbstractRequestResponse { - - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_METADATA.id); - private static final String ERROR_CODE_KEY_NAME = "error_code"; - private static final String COORDINATOR_KEY_NAME = "coordinator"; - - // coordinator level field names - private static final String NODE_ID_KEY_NAME = "node_id"; - private static final String HOST_KEY_NAME = "host"; - private static final String PORT_KEY_NAME = "port"; - - private final short errorCode; - private final Node node; - - public GroupMetadataResponse(short errorCode, Node node) { - super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, errorCode); - Struct coordinator = struct.instance(COORDINATOR_KEY_NAME); - coordinator.set(NODE_ID_KEY_NAME, node.id()); - coordinator.set(HOST_KEY_NAME, node.host()); - coordinator.set(PORT_KEY_NAME, node.port()); - struct.set(COORDINATOR_KEY_NAME, coordinator); - this.errorCode = errorCode; - this.node = node; - } - - public GroupMetadataResponse(Struct struct) { - super(struct); - errorCode = struct.getShort(ERROR_CODE_KEY_NAME); - Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME); - int nodeId = broker.getInt(NODE_ID_KEY_NAME); - String host = broker.getString(HOST_KEY_NAME); - int port = broker.getInt(PORT_KEY_NAME); - node = new Node(nodeId, host, port); - } - - public short errorCode() { - return errorCode; - } - - public Node node() { - return node; - } - - public static GroupMetadataResponse parse(ByteBuffer buffer) { - return new GroupMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java new file mode 100644 index 0000000..439720f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.Collections; + +public class ListGroupsRequest extends AbstractRequest { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_GROUPS.id); + + public ListGroupsRequest() { + super(new Struct(CURRENT_SCHEMA)); + } + + public ListGroupsRequest(Struct struct) { + super(struct); + } + + @Override + public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { + switch (versionId) { + case 0: + short errorCode = Errors.forException(e).code(); + return new ListGroupsResponse(errorCode, Collections.<ListGroupsResponse.Group>emptyList()); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_GROUPS.id))); + } + } + + public static ListGroupsRequest parse(ByteBuffer buffer, int versionId) { + return new ListGroupsRequest(ProtoUtils.parseRequest(ApiKeys.LIST_GROUPS.id, versionId, buffer)); + } + + public static ListGroupsRequest parse(ByteBuffer buffer) { + return new ListGroupsRequest((Struct) CURRENT_SCHEMA.read(buffer)); + } + + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java new file mode 100644 index 0000000..d07f0d1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class ListGroupsResponse extends AbstractRequestResponse { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_GROUPS.id); + + public static final String ERROR_CODE_KEY_NAME = "error_code"; + public static final String GROUPS_KEY_NAME = "groups"; + public static final String GROUP_ID_KEY_NAME = "group_id"; + public static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type"; + + /** + * Possible error codes: + * + * GROUP_COORDINATOR_NOT_AVAILABLE (15) + * AUTHORIZATION_FAILED (29) + */ + + private final short errorCode; + private final List<Group> groups; + + public ListGroupsResponse(short errorCode, List<Group> groups) { + super(new Struct(CURRENT_SCHEMA)); + struct.set(ERROR_CODE_KEY_NAME, errorCode); + List<Struct> groupList = new ArrayList<>(); + for (Group group : groups) { + Struct groupStruct = struct.instance(GROUPS_KEY_NAME); + groupStruct.set(GROUP_ID_KEY_NAME, group.groupId); + groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType); + groupList.add(groupStruct); + } + struct.set(GROUPS_KEY_NAME, groupList.toArray()); + this.errorCode = errorCode; + this.groups = groups; + } + + public ListGroupsResponse(Struct struct) { + super(struct); + this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + this.groups = new ArrayList<>(); + for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) { + Struct groupStruct = (Struct) groupObj; + String groupId = groupStruct.getString(GROUP_ID_KEY_NAME); + String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME); + this.groups.add(new Group(groupId, protocolType)); + } + } + + public List<Group> groups() { + return groups; + } + + public short errorCode() { + return errorCode; + } + + public static class Group { + private final String groupId; + private final String protocolType; + + public Group(String groupId, String protocolType) { + this.groupId = groupId; + this.protocolType = protocolType; + } + + public String groupId() { + return groupId; + } + + public String protocolType() { + return protocolType; + } + + } + + public static ListGroupsResponse parse(ByteBuffer buffer) { + return new ListGroupsResponse((Struct) CURRENT_SCHEMA.read(buffer)); + } + + public static ListGroupsResponse fromError(Errors error) { + return new ListGroupsResponse(error.code(), Collections.<Group>emptyList()); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 2029e92..8667f22 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.requests.GroupMetadataResponse; +import org.apache.kafka.common.requests.GroupCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.OffsetCommitRequest; @@ -729,7 +729,7 @@ public class ConsumerCoordinatorTest { } private Struct consumerMetadataResponse(Node node, short error) { - GroupMetadataResponse response = new GroupMetadataResponse(error, node); + GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node); return response.toStruct(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index fb21802..761b9db 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -28,6 +28,7 @@ import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -43,9 +44,9 @@ public class RequestResponseTest { List<AbstractRequestResponse> requestResponseList = Arrays.asList( createRequestHeader(), createResponseHeader(), - createConsumerMetadataRequest(), - createConsumerMetadataRequest().getErrorResponse(0, new UnknownServerException()), - createConsumerMetadataResponse(), + createGroupCoordinatorRequest(), + createGroupCoordinatorRequest().getErrorResponse(0, new UnknownServerException()), + createGroupCoordinatorResponse(), createControlledShutdownRequest(), createControlledShutdownResponse(), createControlledShutdownRequest().getErrorResponse(1, new UnknownServerException()), @@ -61,6 +62,12 @@ public class RequestResponseTest { createLeaveGroupRequest(), createLeaveGroupRequest().getErrorResponse(0, new UnknownServerException()), createLeaveGroupResponse(), + createListGroupsRequest(), + createListGroupsRequest().getErrorResponse(0, new UnknownServerException()), + createListGroupsResponse(), + createDescribeGroupRequest(), + createDescribeGroupRequest().getErrorResponse(0, new UnknownServerException()), + createDescribeGroupResponse(), createListOffsetRequest(), createListOffsetRequest().getErrorResponse(0, new UnknownServerException()), createListOffsetResponse(), @@ -150,12 +157,12 @@ public class RequestResponseTest { return new ResponseHeader(10); } - private AbstractRequest createConsumerMetadataRequest() { - return new GroupMetadataRequest("test-group"); + private AbstractRequest createGroupCoordinatorRequest() { + return new GroupCoordinatorRequest("test-group"); } - private AbstractRequestResponse createConsumerMetadataResponse() { - return new GroupMetadataResponse(Errors.NONE.code(), new Node(10, "host1", 2014)); + private AbstractRequestResponse createGroupCoordinatorResponse() { + return new GroupCoordinatorResponse(Errors.NONE.code(), new Node(10, "host1", 2014)); } private AbstractRequest createFetchRequest() { @@ -193,6 +200,30 @@ public class RequestResponseTest { return new JoinGroupResponse(Errors.NONE.code(), 1, "range", "consumer1", "leader", members); } + private AbstractRequest createListGroupsRequest() { + return new ListGroupsRequest(); + } + + private AbstractRequestResponse createListGroupsResponse() { + List<ListGroupsResponse.Group> groups = Arrays.asList(new ListGroupsResponse.Group("test-group", "consumer")); + return new ListGroupsResponse(Errors.NONE.code(), groups); + } + + private AbstractRequest createDescribeGroupRequest() { + return new DescribeGroupsRequest(Collections.singletonList("test-group")); + } + + private AbstractRequestResponse createDescribeGroupResponse() { + String clientId = "consumer-1"; + String clientHost = "localhost"; + ByteBuffer empty = ByteBuffer.allocate(0); + DescribeGroupsResponse.GroupMember member = new DescribeGroupsResponse.GroupMember("memberId", + clientId, clientHost, empty, empty); + DescribeGroupsResponse.GroupMetadata metadata = new DescribeGroupsResponse.GroupMetadata(Errors.NONE.code(), + "STABLE", "consumer", "roundrobin", Arrays.asList(member)); + return new DescribeGroupsResponse(Collections.singletonMap("test-group", metadata)); + } + private AbstractRequest createLeaveGroupRequest() { return new LeaveGroupRequest("group1", "consumer1"); } http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java index ca53674..ac9df44 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java @@ -25,7 +25,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.requests.GroupMetadataResponse; +import org.apache.kafka.common.requests.GroupCoordinatorResponse; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; @@ -386,7 +386,7 @@ public class WorkerCoordinatorTest { private Struct groupMetadataResponse(Node node, short error) { - GroupMetadataResponse response = new GroupMetadataResponse(error, node); + GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node); return response.toStruct(); }