This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 99a4068 KAFKA-7689; Add AlterConsumerGroup/List Offsets to
AdminClient [KIP-396] (#7296)
99a4068 is described below
commit 99a4068c5ca61951d70b9e647ead3b08a2af4309
Author: Mickael Maison <[email protected]>
AuthorDate: Sun Oct 20 05:30:50 2019 +0100
KAFKA-7689; Add AlterConsumerGroup/List Offsets to AdminClient [KIP-396]
(#7296)
This patch implements new AdminClient APIs to list offsets and alter
consumer group offsets as documented in KIP-396:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484.
Reviewers: Guozhang Wang <[email protected]>, Jason Gustafson
<[email protected]>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 51 ++
.../admin/AlterConsumerGroupOffsetsOptions.java} | 32 +-
.../admin/AlterConsumerGroupOffsetsResult.java | 96 ++++
.../kafka/clients/admin/KafkaAdminClient.java | 508 ++++++++++++++------
.../admin/ListOffsetsOptions.java} | 35 +-
.../kafka/clients/admin/ListOffsetsResult.java | 107 +++++
.../org/apache/kafka/clients/admin/OffsetSpec.java | 62 +++
.../internals/ConsumerGroupOperationContext.java | 87 ++++
.../admin/internals/MetadataOperationContext.java | 96 ++++
.../kafka/clients/consumer/ConsumerConfig.java | 2 +-
.../kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../kafka/clients/consumer/internals/Fetcher.java | 2 +-
.../consumer/internals/SubscriptionState.java | 2 +-
.../common/{requests => }/IsolationLevel.java | 2 +-
.../kafka/common/requests/AlterConfigsRequest.java | 2 +-
.../common/requests/DescribeConfigsRequest.java | 2 +-
.../apache/kafka/common/requests/FetchRequest.java | 1 +
.../kafka/common/requests/ListOffsetRequest.java | 2 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 516 ++++++++++++++++++++-
.../kafka/clients/admin/MockAdminClient.java | 11 +
.../kafka/clients/consumer/KafkaConsumerTest.java | 2 +-
.../clients/consumer/internals/FetcherTest.java | 2 +-
.../kafka/common/requests/RequestResponseTest.java | 1 +
.../scala/kafka/admin/ConsumerGroupCommand.scala | 92 ++--
core/src/main/scala/kafka/cluster/Partition.scala | 2 +-
.../main/scala/kafka/server/ReplicaManager.scala | 2 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 4 +-
.../scala/unit/kafka/server/FetchRequestTest.scala | 4 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 2 +-
.../unit/kafka/server/ListOffsetsRequestTest.scala | 4 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 4 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 2 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
.../org/apache/kafka/streams/StreamsConfig.java | 2 +-
.../apache/kafka/streams/StreamsConfigTest.java | 4 +-
.../streams/integration/EosIntegrationTest.java | 2 +-
.../streams/tests/BrokerCompatibilityTest.java | 2 +-
.../apache/kafka/streams/tests/EosTestDriver.java | 2 +-
39 files changed, 1485 insertions(+), 270 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 915bd72..a6458b1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -25,6 +25,8 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@@ -1081,6 +1083,55 @@ public interface Admin extends AutoCloseable {
MembershipChangeResult removeMemberFromConsumerGroup(String groupId,
RemoveMemberFromConsumerGroupOptions options);
/**
+ * <p>Alters offsets for the specified group. In order to succeed, the
group must be empty.
+ *
+ * <p>This is a convenience method for {@link
#alterConsumerGroupOffsets(String, Map, AlterConsumerGroupOffsetsOptions)} with
default options.
+ * See the overload for more details.
+ *
+ * @param groupId The group for which to alter offsets.
+ * @param offsets A map of offsets by partition with associated metadata.
+ * @return The AlterOffsetsResult.
+ */
+ default AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String
groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
+ return alterConsumerGroupOffsets(groupId, offsets, new
AlterConsumerGroupOffsetsOptions());
+ }
+
+ /**
+ * <p>Alters offsets for the specified group. In order to succeed, the
group must be empty.
+ *
+ * <p>This operation is not transactional so it may succeed for some
partitions while fail for others.
+ *
+ * @param groupId The group for which to alter offsets.
+ * @param offsets A map of offsets by partition with associated metadata.
Partitions not specified in the map are ignored.
+ * @param options The options to use when altering the offsets.
+ * @return The AlterOffsetsResult.
+ */
+ AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId,
Map<TopicPartition, OffsetAndMetadata> offsets,
AlterConsumerGroupOffsetsOptions options);
+
+ /**
+ * <p>List offset for the specified partitions and OffsetSpec. This
operation enables to find
+ * the beginning offset, end offset as well as the offset matching a
timestamp in partitions.
+ *
+ * <p>This is a convenience method for {@link #listOffsets(Map,
ListOffsetsOptions)}
+ *
+ * @param topicPartitionOffsets The mapping from partition to the
OffsetSpec to look up.
+ * @return The ListOffsetsResult.
+ */
+ default ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec>
topicPartitionOffsets) {
+ return listOffsets(topicPartitionOffsets, new ListOffsetsOptions());
+ }
+
+ /**
+ * <p>List offset for the specified partitions. This operation enables to
find
+ * the beginning offset, end offset as well as the offset matching a
timestamp in partitions.
+ *
+ * @param topicPartitionOffsets The mapping from partition to the
OffsetSpec to look up.
+ * @param options The options to use when retrieving the offsets
+ * @return The ListOffsetsResult.
+ */
+ ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec>
topicPartitionOffsets, ListOffsetsOptions options);
+
+ /**
* Get the metrics kept by the adminClient
*/
Map<MetricName, ? extends Metric> metrics();
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsOptions.java
similarity index 58%
copy from
clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
copy to
clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsOptions.java
index a09b625..eb8b8ec 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsOptions.java
@@ -14,29 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.requests;
+package org.apache.kafka.clients.admin;
-public enum IsolationLevel {
- READ_UNCOMMITTED((byte) 0), READ_COMMITTED((byte) 1);
+import org.apache.kafka.common.annotation.InterfaceStability;
- private final byte id;
-
- IsolationLevel(byte id) {
- this.id = id;
- }
-
- public byte id() {
- return id;
- }
-
- public static IsolationLevel forId(byte id) {
- switch (id) {
- case 0:
- return READ_UNCOMMITTED;
- case 1:
- return READ_COMMITTED;
- default:
- throw new IllegalArgumentException("Unknown isolation level "
+ id);
- }
- }
+/**
+ * Options for the {@link AdminClient#alterConsumerGroupOffsets(String, Map)}
call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
[email protected]
+public class AlterConsumerGroupOffsetsOptions extends
AbstractOptions<AlterConsumerGroupOffsetsOptions> {
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java
new file mode 100644
index 0000000..38ee14a
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.KafkaFuture.BaseFunction;
+import org.apache.kafka.common.KafkaFuture.BiConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.protocol.Errors;
+
+/**
+ * The result of the {@link AdminClient#alterConsumerGroupOffsets(String,
Map)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
[email protected]
+public class AlterConsumerGroupOffsetsResult {
+
+ private final KafkaFuture<Map<TopicPartition, Errors>> future;
+
+ AlterConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>>
future) {
+ this.future = future;
+ }
+
+ /**
+ * Return a future which can be used to check the result for a given
partition.
+ */
+ public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
+ final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+
+ this.future.whenComplete(new BiConsumer<Map<TopicPartition, Errors>,
Throwable>() {
+ @Override
+ public void accept(final Map<TopicPartition, Errors>
topicPartitions, final Throwable throwable) {
+ if (throwable != null) {
+ result.completeExceptionally(throwable);
+ } else if (!topicPartitions.containsKey(partition)) {
+ result.completeExceptionally(new IllegalArgumentException(
+ "Alter offset for partition \"" + partition + "\" was
not attempted"));
+ } else {
+ final Errors error = topicPartitions.get(partition);
+ if (error == Errors.NONE) {
+ result.complete(null);
+ } else {
+ result.completeExceptionally(error.exception());
+ }
+ }
+
+ }
+ });
+
+ return result;
+ }
+
+ /**
+ * Return a future which succeeds if all the alter offsets succeed.
+ */
+ public KafkaFuture<Void> all() {
+ return this.future.thenApply(new BaseFunction<Map<TopicPartition,
Errors>, Void>() {
+ @Override
+ public Void apply(final Map<TopicPartition, Errors>
topicPartitionErrorsMap) {
+ List<TopicPartition> partitionsFailed =
topicPartitionErrorsMap.entrySet()
+ .stream()
+ .filter(e -> e.getValue() != Errors.NONE)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ for (Errors error : topicPartitionErrorsMap.values()) {
+ if (error != Errors.NONE) {
+ throw error.exception(
+ "Failed altering consumer group offsets for the
following partitions: " + partitionsFailed);
+ }
+ }
+ return null;
+ }
+ });
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 0850ced..984e863 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -29,7 +29,11 @@ import
org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
import
org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.OffsetSpec.TimestampSpec;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
+import org.apache.kafka.clients.admin.internals.ConsumerGroupOperationContext;
+import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
@@ -87,6 +91,11 @@ import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
+import
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
+import
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
import
org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestPartition;
import
org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopic;
@@ -153,10 +162,15 @@ import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.ListOffsetRequest;
+import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData;
import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest;
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetDeleteRequest;
import org.apache.kafka.common.requests.OffsetDeleteResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
@@ -187,6 +201,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -714,6 +729,7 @@ public class KafkaAdminClient extends AdminClient {
*
* @return The AbstractRequest builder.
*/
+ @SuppressWarnings("rawtypes")
abstract AbstractRequest.Builder createRequest(int timeoutMs);
/**
@@ -1271,7 +1287,7 @@ public class KafkaAdminClient extends AdminClient {
return new Call(true, "fetchMetadata", calcDeadlineMs(now,
defaultTimeoutMs),
new MetadataUpdateNodeIdProvider()) {
@Override
- public AbstractRequest.Builder createRequest(int timeoutMs) {
+ public MetadataRequest.Builder createRequest(int timeoutMs) {
// Since this only requests node information, it's safe to
pass true
// for allowAutoTopicCreation (and it simplifies
communication with
// older brokers)
@@ -1338,7 +1354,7 @@ public class KafkaAdminClient extends AdminClient {
new ControllerNodeProvider()) {
@Override
- public AbstractRequest.Builder createRequest(int timeoutMs) {
+ public CreateTopicsRequest.Builder createRequest(int timeoutMs) {
return new CreateTopicsRequest.Builder(
new CreateTopicsRequestData().
setTopics(topics).
@@ -1435,7 +1451,7 @@ public class KafkaAdminClient extends AdminClient {
new ControllerNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ DeleteTopicsRequest.Builder createRequest(int timeoutMs) {
return new DeleteTopicsRequest.Builder(new
DeleteTopicsRequestData()
.setTopicNames(validTopicNames)
.setTimeoutMs(timeoutMs));
@@ -1495,7 +1511,7 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ MetadataRequest.Builder createRequest(int timeoutMs) {
return MetadataRequest.Builder.allTopics();
}
@@ -1542,7 +1558,7 @@ public class KafkaAdminClient extends AdminClient {
private boolean supportsDisablingTopicCreation = true;
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ MetadataRequest.Builder createRequest(int timeoutMs) {
if (supportsDisablingTopicCreation)
return new MetadataRequest.Builder(new
MetadataRequestData()
.setTopics(convertToMetadataRequestTopic(topicNamesList))
@@ -1624,7 +1640,7 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ MetadataRequest.Builder createRequest(int timeoutMs) {
// Since this only requests node information, it's safe to
pass true for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
return new MetadataRequest.Builder(new MetadataRequestData()
@@ -1676,7 +1692,7 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ DescribeAclsRequest.Builder createRequest(int timeoutMs) {
return new DescribeAclsRequest.Builder(filter);
}
@@ -1720,7 +1736,7 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ CreateAclsRequest.Builder createRequest(int timeoutMs) {
return new CreateAclsRequest.Builder(aclCreations);
}
@@ -1768,7 +1784,7 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ DeleteAclsRequest.Builder createRequest(int timeoutMs) {
return new DeleteAclsRequest.Builder(filterList);
}
@@ -1834,7 +1850,7 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
return new
DescribeConfigsRequest.Builder(unifiedRequestResources)
.includeSynonyms(options.includeSynonyms());
}
@@ -1881,7 +1897,7 @@ public class KafkaAdminClient extends AdminClient {
new ConstantNodeIdProvider(nodeId)) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
return new
DescribeConfigsRequest.Builder(Collections.singleton(resource))
.includeSynonyms(options.includeSynonyms());
}
@@ -1995,7 +2011,7 @@ public class KafkaAdminClient extends AdminClient {
runnable.call(new Call("alterConfigs", calcDeadlineMs(now,
options.timeoutMs()), nodeProvider) {
@Override
- public AbstractRequest.Builder createRequest(int timeoutMs) {
+ public AlterConfigsRequest.Builder createRequest(int timeoutMs) {
return new AlterConfigsRequest.Builder(requestMap,
options.shouldValidateOnly());
}
@@ -2055,7 +2071,7 @@ public class KafkaAdminClient extends AdminClient {
runnable.call(new Call("incrementalAlterConfigs", calcDeadlineMs(now,
options.timeoutMs()), nodeProvider) {
@Override
- public AbstractRequest.Builder createRequest(int timeoutMs) {
+ public IncrementalAlterConfigsRequest.Builder createRequest(int
timeoutMs) {
return new IncrementalAlterConfigsRequest.Builder(
toIncrementalAlterConfigsRequestData(resources,
configs, options.shouldValidateOnly()));
}
@@ -2131,7 +2147,7 @@ public class KafkaAdminClient extends AdminClient {
new ConstantNodeIdProvider(brokerId)) {
@Override
- public AbstractRequest.Builder createRequest(int timeoutMs) {
+ public AlterReplicaLogDirsRequest.Builder createRequest(int
timeoutMs) {
return new AlterReplicaLogDirsRequest.Builder(assignment);
}
@@ -2177,7 +2193,7 @@ public class KafkaAdminClient extends AdminClient {
new ConstantNodeIdProvider(brokerId)) {
@Override
- public AbstractRequest.Builder createRequest(int timeoutMs) {
+ public DescribeLogDirsRequest.Builder createRequest(int
timeoutMs) {
// Query selected partitions in all log directories
return new DescribeLogDirsRequest.Builder(null);
}
@@ -2231,7 +2247,7 @@ public class KafkaAdminClient extends AdminClient {
new ConstantNodeIdProvider(brokerId)) {
@Override
- public AbstractRequest.Builder createRequest(int timeoutMs) {
+ public DescribeLogDirsRequest.Builder createRequest(int
timeoutMs) {
// Query selected partitions in all log directories
return new DescribeLogDirsRequest.Builder(topicPartitions);
}
@@ -2302,7 +2318,7 @@ public class KafkaAdminClient extends AdminClient {
new ControllerNodeProvider()) {
@Override
- public AbstractRequest.Builder createRequest(int timeoutMs) {
+ public CreatePartitionsRequest.Builder createRequest(int
timeoutMs) {
return new CreatePartitionsRequest.Builder(requestMap,
timeoutMs, options.validateOnly());
}
@@ -2360,7 +2376,7 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ MetadataRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(new MetadataRequestData()
.setTopics(convertToMetadataRequestTopic(topics))
.setAllowAutoTopicCreation(false));
@@ -2404,7 +2420,7 @@ public class KafkaAdminClient extends AdminClient {
new ConstantNodeIdProvider(brokerId)) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ DeleteRecordsRequest.Builder createRequest(int
timeoutMs) {
return new DeleteRecordsRequest.Builder(timeoutMs,
partitionDeleteOffsets);
}
@@ -2455,7 +2471,7 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder<CreateDelegationTokenRequest>
createRequest(int timeoutMs) {
+ CreateDelegationTokenRequest.Builder createRequest(int timeoutMs) {
return new CreateDelegationTokenRequest.Builder(
new CreateDelegationTokenRequestData()
.setRenewers(renewers)
@@ -2493,7 +2509,7 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder<RenewDelegationTokenRequest>
createRequest(int timeoutMs) {
+ RenewDelegationTokenRequest.Builder createRequest(int timeoutMs) {
return new RenewDelegationTokenRequest.Builder(
new RenewDelegationTokenRequestData()
.setHmac(hmac)
@@ -2527,7 +2543,7 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder<ExpireDelegationTokenRequest>
createRequest(int timeoutMs) {
+ ExpireDelegationTokenRequest.Builder createRequest(int timeoutMs) {
return new ExpireDelegationTokenRequest.Builder(
new ExpireDelegationTokenRequestData()
.setHmac(hmac)
@@ -2561,7 +2577,7 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ DescribeDelegationTokenRequest.Builder createRequest(int
timeoutMs) {
return new
DescribeDelegationTokenRequest.Builder(options.owners());
}
@@ -2584,72 +2600,23 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeDelegationTokenResult(tokensFuture);
}
- /**
- * Context class to encapsulate parameters of a call to find and use a
consumer group coordinator.
- * Some of the parameters are provided at construction and are immutable
whereas others are provided
- * as "Call" are completed and values are available, like node id of the
coordinator.
- *
- * @param <T> The type of return value of the KafkaFuture
- * @param <O> The type of configuration option. Different for different
consumer group commands.
- */
- private final static class ConsumerGroupOperationContext<T, O extends
AbstractOptions<O>> {
- final private String groupId;
- final private O options;
- final private long deadline;
- final private KafkaFutureImpl<T> future;
- private Optional<Node> node;
-
- public ConsumerGroupOperationContext(String groupId,
- O options,
- long deadline,
- KafkaFutureImpl<T> future) {
- this.groupId = groupId;
- this.options = options;
- this.deadline = deadline;
- this.future = future;
- this.node = Optional.empty();
- }
-
- public String getGroupId() {
- return groupId;
- }
-
- public O getOptions() {
- return options;
- }
-
- public long getDeadline() {
- return deadline;
- }
-
- public KafkaFutureImpl<T> getFuture() {
- return future;
- }
-
- public Optional<Node> getNode() {
- return node;
- }
-
- public void setNode(Node node) {
- this.node = Optional.ofNullable(node);
- }
-
- public boolean hasCoordinatorMoved(AbstractResponse response) {
- return response.errorCounts().keySet()
- .stream()
- .anyMatch(error -> error == Errors.NOT_COORDINATOR);
- }
- }
-
- private void rescheduleTask(ConsumerGroupOperationContext<?, ?> context,
Supplier<Call> nextCall) {
+ private void
rescheduleFindCoordinatorTask(ConsumerGroupOperationContext<?, ?> context,
Supplier<Call> nextCall) {
log.info("Node {} is no longer the Coordinator. Retrying with new
coordinator.",
- context.getNode().orElse(null));
+ context.node().orElse(null));
// Requeue the task so that we can try with new coordinator
context.setNode(null);
Call findCoordinatorCall = getFindCoordinatorCall(context, nextCall);
runnable.call(findCoordinatorCall, time.milliseconds());
}
+ private void rescheduleMetadataTask(MetadataOperationContext<?, ?>
context, Supplier<List<Call>> nextCalls) {
+ log.info("Retrying to fetch metadata.");
+ // Requeue the task so that we can re-attempt fetching metadata
+ context.setResponse(Optional.empty());
+ Call metadataCall = getMetadataCall(context, nextCalls);
+ runnable.call(metadataCall, time.milliseconds());
+ }
+
private static <T> Map<String, KafkaFutureImpl<T>>
createFutures(Collection<String> groupIds) {
return new HashSet<>(groupIds).stream().collect(
Collectors.toMap(groupId -> groupId,
@@ -2703,21 +2670,21 @@ public class KafkaAdminClient extends AdminClient {
* @param <O> The type of configuration option, like
DescribeConsumerGroupsOptions, ListConsumerGroupsOptions etc
*/
private <T, O extends AbstractOptions<O>> Call
getFindCoordinatorCall(ConsumerGroupOperationContext<T, O> context,
- Supplier<Call> nextCall) {
- return new Call("findCoordinator", context.getDeadline(), new
LeastLoadedNodeProvider()) {
+
Supplier<Call> nextCall) {
+ return new Call("findCoordinator", context.deadline(), new
LeastLoadedNodeProvider()) {
@Override
FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
return new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
- .setKey(context.getGroupId()));
+ .setKey(context.groupId()));
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse response =
(FindCoordinatorResponse) abstractResponse;
- if (handleGroupRequestError(response.error(),
context.getFuture()))
+ if (handleGroupRequestError(response.error(),
context.future()))
return;
context.setNode(response.node());
@@ -2727,7 +2694,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleFailure(Throwable throwable) {
- context.getFuture().completeExceptionally(throwable);
+ context.future().completeExceptionally(throwable);
}
};
}
@@ -2735,14 +2702,14 @@ public class KafkaAdminClient extends AdminClient {
private Call getDescribeConsumerGroupsCall(
ConsumerGroupOperationContext<ConsumerGroupDescription,
DescribeConsumerGroupsOptions> context) {
return new Call("describeConsumerGroups",
- context.getDeadline(),
- new ConstantNodeIdProvider(context.getNode().get().id())) {
+ context.deadline(),
+ new ConstantNodeIdProvider(context.node().get().id())) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ DescribeGroupsRequest.Builder createRequest(int timeoutMs) {
return new DescribeGroupsRequest.Builder(
new DescribeGroupsRequestData()
-
.setGroups(Collections.singletonList(context.getGroupId()))
-
.setIncludeAuthorizedOperations(context.getOptions().includeAuthorizedOperations()));
+
.setGroups(Collections.singletonList(context.groupId()))
+
.setIncludeAuthorizedOperations(context.options().includeAuthorizedOperations()));
}
@Override
@@ -2751,29 +2718,29 @@ public class KafkaAdminClient extends AdminClient {
List<DescribedGroup> describedGroups =
response.data().groups();
if (describedGroups.isEmpty()) {
- context.getFuture().completeExceptionally(
- new InvalidGroupIdException("No consumer group
found for GroupId: " + context.getGroupId()));
+ context.future().completeExceptionally(
+ new InvalidGroupIdException("No consumer group
found for GroupId: " + context.groupId()));
return;
}
if (describedGroups.size() > 1 ||
-
!describedGroups.get(0).groupId().equals(context.getGroupId())) {
+
!describedGroups.get(0).groupId().equals(context.groupId())) {
String ids =
Arrays.toString(describedGroups.stream().map(DescribedGroup::groupId).toArray());
- context.getFuture().completeExceptionally(new
InvalidGroupIdException(
- "DescribeConsumerGroup request for GroupId: " +
context.getGroupId() + " returned " + ids));
+ context.future().completeExceptionally(new
InvalidGroupIdException(
+ "DescribeConsumerGroup request for GroupId: " +
context.groupId() + " returned " + ids));
return;
}
final DescribedGroup describedGroup = describedGroups.get(0);
// If coordinator changed since we fetched it, retry
- if (context.hasCoordinatorMoved(response)) {
- rescheduleTask(context, () ->
getDescribeConsumerGroupsCall(context));
+ if
(ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
+ rescheduleFindCoordinatorTask(context, () ->
getDescribeConsumerGroupsCall(context));
return;
}
final Errors groupError =
Errors.forCode(describedGroup.errorCode());
- if (handleGroupRequestError(groupError, context.getFuture()))
+ if (handleGroupRequestError(groupError, context.future()))
return;
final String protocolType = describedGroup.protocolType();
@@ -2797,19 +2764,59 @@ public class KafkaAdminClient extends AdminClient {
memberDescriptions.add(memberDescription);
}
final ConsumerGroupDescription consumerGroupDescription =
- new ConsumerGroupDescription(context.getGroupId(),
protocolType.isEmpty(),
+ new ConsumerGroupDescription(context.groupId(),
protocolType.isEmpty(),
memberDescriptions,
describedGroup.protocolData(),
ConsumerGroupState.parse(describedGroup.groupState()),
- context.getNode().get(),
+ context.node().get(),
authorizedOperations);
- context.getFuture().complete(consumerGroupDescription);
+ context.future().complete(consumerGroupDescription);
}
}
@Override
void handleFailure(Throwable throwable) {
- context.getFuture().completeExceptionally(throwable);
+ context.future().completeExceptionally(throwable);
+ }
+ };
+ }
+
+ /**
+ * Returns a {@code Call} object to fetch the cluster metadata. Takes a
List of Calls
+ * parameter to schedule actions that need to be taken using the metadata.
The param is a Supplier
+ * so that it can be lazily created, so that it can use the results of the
metadata call in its
+ * construction.
+ *
+ * @param <T> The type of return value of the KafkaFuture, like
ListOffsetsResultInfo, etc.
+ * @param <O> The type of configuration option, like ListOffsetsOptions,
etc
+ */
+ private <T, O extends AbstractOptions<O>> Call
getMetadataCall(MetadataOperationContext<T, O> context,
+
Supplier<List<Call>> nextCalls) {
+ return new Call("metadata", context.deadline(), new
LeastLoadedNodeProvider()) {
+ @Override
+ MetadataRequest.Builder createRequest(int timeoutMs) {
+ return new MetadataRequest.Builder(new MetadataRequestData()
+ .setTopics(convertToMetadataRequestTopic(context.topics()))
+ .setAllowAutoTopicCreation(false));
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ MetadataResponse response = (MetadataResponse)
abstractResponse;
+ MetadataOperationContext.handleMetadataErrors(response);
+
+ context.setResponse(Optional.of(response));
+
+ for (Call call : nextCalls.get()) {
+ runnable.call(call, time.milliseconds());
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ for (KafkaFutureImpl<T> future : context.futures().values()) {
+ future.completeExceptionally(throwable);
+ }
}
};
}
@@ -2890,7 +2897,7 @@ public class KafkaAdminClient extends AdminClient {
final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
runnable.call(new Call("findAllBrokers", deadline, new
LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ MetadataRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(new MetadataRequestData()
.setTopics(Collections.emptyList())
.setAllowAutoTopicCreation(true));
@@ -2910,7 +2917,7 @@ public class KafkaAdminClient extends AdminClient {
final long nowList = time.milliseconds();
runnable.call(new Call("listConsumerGroups", deadline, new
ConstantNodeIdProvider(node.id())) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ ListGroupsRequest.Builder createRequest(int timeoutMs)
{
return new ListGroupsRequest.Builder(new
ListGroupsRequestData());
}
@@ -2981,11 +2988,11 @@ public class KafkaAdminClient extends AdminClient {
private Call
getListConsumerGroupOffsetsCall(ConsumerGroupOperationContext<Map<TopicPartition,
OffsetAndMetadata>,
ListConsumerGroupOffsetsOptions> context) {
- return new Call("listConsumerGroupOffsets", context.getDeadline(),
- new ConstantNodeIdProvider(context.getNode().get().id())) {
+ return new Call("listConsumerGroupOffsets", context.deadline(),
+ new ConstantNodeIdProvider(context.node().get().id())) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
- return new OffsetFetchRequest.Builder(context.getGroupId(),
context.getOptions().topicPartitions());
+ OffsetFetchRequest.Builder createRequest(int timeoutMs) {
+ return new OffsetFetchRequest.Builder(context.groupId(),
context.options().topicPartitions());
}
@Override
@@ -2994,12 +3001,12 @@ public class KafkaAdminClient extends AdminClient {
final Map<TopicPartition, OffsetAndMetadata>
groupOffsetsListing = new HashMap<>();
// If coordinator changed since we fetched it, retry
- if (context.hasCoordinatorMoved(response)) {
- rescheduleTask(context, () ->
getListConsumerGroupOffsetsCall(context));
+ if
(ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
+ rescheduleFindCoordinatorTask(context, () ->
getListConsumerGroupOffsetsCall(context));
return;
}
- if (handleGroupRequestError(response.error(),
context.getFuture()))
+ if (handleGroupRequestError(response.error(),
context.future()))
return;
for (Map.Entry<TopicPartition,
OffsetFetchResponse.PartitionData> entry :
@@ -3017,12 +3024,12 @@ public class KafkaAdminClient extends AdminClient {
log.warn("Skipping return offset for {} due to error
{}.", topicPartition, error);
}
}
- context.getFuture().complete(groupOffsetsListing);
+ context.future().complete(groupOffsetsListing);
}
@Override
void handleFailure(Throwable throwable) {
- context.getFuture().completeExceptionally(throwable);
+ context.future().completeExceptionally(throwable);
}
};
}
@@ -3053,13 +3060,13 @@ public class KafkaAdminClient extends AdminClient {
}
private Call
getDeleteConsumerGroupsCall(ConsumerGroupOperationContext<Void,
DeleteConsumerGroupsOptions> context) {
- return new Call("deleteConsumerGroups", context.getDeadline(), new
ConstantNodeIdProvider(context.getNode().get().id())) {
+ return new Call("deleteConsumerGroups", context.deadline(), new
ConstantNodeIdProvider(context.node().get().id())) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ DeleteGroupsRequest.Builder createRequest(int timeoutMs) {
return new DeleteGroupsRequest.Builder(
new DeleteGroupsRequestData()
-
.setGroupsNames(Collections.singletonList(context.getGroupId()))
+
.setGroupsNames(Collections.singletonList(context.groupId()))
);
}
@@ -3068,21 +3075,21 @@ public class KafkaAdminClient extends AdminClient {
final DeleteGroupsResponse response = (DeleteGroupsResponse)
abstractResponse;
// If coordinator changed since we fetched it, retry
- if (context.hasCoordinatorMoved(response)) {
- rescheduleTask(context, () ->
getDeleteConsumerGroupsCall(context));
+ if
(ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
+ rescheduleFindCoordinatorTask(context, () ->
getDeleteConsumerGroupsCall(context));
return;
}
- final Errors groupError = response.get(context.getGroupId());
- if (handleGroupRequestError(groupError, context.getFuture()))
+ final Errors groupError = response.get(context.groupId());
+ if (handleGroupRequestError(groupError, context.future()))
return;
- context.getFuture().complete(null);
+ context.future().complete(null);
}
@Override
void handleFailure(Throwable throwable) {
- context.getFuture().completeExceptionally(throwable);
+ context.future().completeExceptionally(throwable);
}
};
}
@@ -3115,10 +3122,10 @@ public class KafkaAdminClient extends AdminClient {
private Call getDeleteConsumerGroupOffsetsCall(
ConsumerGroupOperationContext<Map<TopicPartition, Errors>,
DeleteConsumerGroupOffsetsOptions> context,
Set<TopicPartition> partitions) {
- return new Call("deleteConsumerGroupOffsets", context.getDeadline(),
new ConstantNodeIdProvider(context.getNode().get().id())) {
+ return new Call("deleteConsumerGroupOffsets", context.deadline(), new
ConstantNodeIdProvider(context.node().get().id())) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ OffsetDeleteRequest.Builder createRequest(int timeoutMs) {
final OffsetDeleteRequestTopicCollection topics = new
OffsetDeleteRequestTopicCollection();
partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic,
topicPartitions) -> {
@@ -3134,7 +3141,7 @@ public class KafkaAdminClient extends AdminClient {
return new OffsetDeleteRequest.Builder(
new OffsetDeleteRequestData()
- .setGroupId(context.groupId)
+ .setGroupId(context.groupId())
.setTopics(topics)
);
}
@@ -3144,14 +3151,14 @@ public class KafkaAdminClient extends AdminClient {
final OffsetDeleteResponse response = (OffsetDeleteResponse)
abstractResponse;
// If coordinator changed since we fetched it, retry
- if (context.hasCoordinatorMoved(response)) {
- rescheduleTask(context, () ->
getDeleteConsumerGroupOffsetsCall(context, partitions));
+ if
(ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
+ rescheduleFindCoordinatorTask(context, () ->
getDeleteConsumerGroupOffsetsCall(context, partitions));
return;
}
// If the error is an error at the group level, the future is
failed with it
final Errors groupError =
Errors.forCode(response.data.errorCode());
- if (handleGroupRequestError(groupError, context.getFuture()))
+ if (handleGroupRequestError(groupError, context.future()))
return;
final Map<TopicPartition, Errors> partitions = new HashMap<>();
@@ -3163,12 +3170,12 @@ public class KafkaAdminClient extends AdminClient {
});
});
- context.getFuture().complete(partitions);
+ context.future().complete(partitions);
}
@Override
void handleFailure(Throwable throwable) {
- context.getFuture().completeExceptionally(throwable);
+ context.future().completeExceptionally(throwable);
}
};
}
@@ -3189,7 +3196,7 @@ public class KafkaAdminClient extends AdminClient {
new ControllerNodeProvider()) {
@Override
- public AbstractRequest.Builder createRequest(int timeoutMs) {
+ public ElectLeadersRequest.Builder createRequest(int timeoutMs) {
return new ElectLeadersRequest.Builder(electionType,
topicPartitions, timeoutMs);
}
@@ -3254,7 +3261,7 @@ public class KafkaAdminClient extends AdminClient {
new ControllerNodeProvider()) {
@Override
- public AbstractRequest.Builder createRequest(int timeoutMs) {
+ public AlterPartitionReassignmentsRequest.Builder
createRequest(int timeoutMs) {
AlterPartitionReassignmentsRequestData data =
new AlterPartitionReassignmentsRequestData();
for (Map.Entry<String, Map<Integer,
Optional<NewPartitionReassignment>>> entry :
@@ -3389,7 +3396,7 @@ public class KafkaAdminClient extends AdminClient {
new ControllerNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
+ ListPartitionReassignmentsRequest.Builder createRequest(int
timeoutMs) {
ListPartitionReassignmentsRequestData listData = new
ListPartitionReassignmentsRequestData();
listData.setTimeoutMs(timeoutMs);
@@ -3482,12 +3489,12 @@ public class KafkaAdminClient extends AdminClient {
private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext
<RemoveMemberFromGroupResult, RemoveMemberFromConsumerGroupOptions> context) {
return new Call("leaveGroup",
- context.getDeadline(),
- new
ConstantNodeIdProvider(context.getNode().get().id())) {
+ context.deadline(),
+ new ConstantNodeIdProvider(context.node().get().id()))
{
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
- return new LeaveGroupRequest.Builder(context.getGroupId(),
-
context.getOptions().getMembers());
+ LeaveGroupRequest.Builder createRequest(int timeoutMs) {
+ return new LeaveGroupRequest.Builder(context.groupId(),
+
context.options().getMembers());
}
@Override
@@ -3495,8 +3502,8 @@ public class KafkaAdminClient extends AdminClient {
final LeaveGroupResponse response = (LeaveGroupResponse)
abstractResponse;
// If coordinator changed since we fetched it, retry
- if (context.hasCoordinatorMoved(response)) {
- rescheduleTask(context, () ->
getRemoveMembersFromGroupCall(context));
+ if
(ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
+ rescheduleFindCoordinatorTask(context, () ->
getRemoveMembersFromGroupCall(context));
return;
}
@@ -3507,15 +3514,226 @@ public class KafkaAdminClient extends AdminClient {
}
final RemoveMemberFromGroupResult membershipChangeResult =
- new RemoveMemberFromGroupResult(response,
context.getOptions().getMembers());
+ new RemoveMemberFromGroupResult(response,
context.options().getMembers());
+
+ context.future().complete(membershipChangeResult);
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ context.future().completeExceptionally(throwable);
+ }
+ };
+ }
+
+ @Override
+ public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String
groupId,
+ Map<TopicPartition,
OffsetAndMetadata> offsets,
+
AlterConsumerGroupOffsetsOptions options) {
+ final KafkaFutureImpl<Map<TopicPartition, Errors>> future = new
KafkaFutureImpl<>();
+
+ final long startFindCoordinatorMs = time.milliseconds();
+ final long deadline = calcDeadlineMs(startFindCoordinatorMs,
options.timeoutMs());
+
+ ConsumerGroupOperationContext<Map<TopicPartition, Errors>,
AlterConsumerGroupOffsetsOptions> context =
+ new ConsumerGroupOperationContext<>(groupId, options,
deadline, future);
+
+ Call findCoordinatorCall = getFindCoordinatorCall(context,
+ () ->
KafkaAdminClient.this.getAlterConsumerGroupOffsetsCall(context, offsets));
+ runnable.call(findCoordinatorCall, startFindCoordinatorMs);
+
+ return new AlterConsumerGroupOffsetsResult(future);
+ }
+
+ private Call
getAlterConsumerGroupOffsetsCall(ConsumerGroupOperationContext<Map<TopicPartition,
Errors>,
+
AlterConsumerGroupOffsetsOptions> context,
+ Map<TopicPartition,
OffsetAndMetadata> offsets) {
+
+ return new Call("commitOffsets", context.deadline(), new
ConstantNodeIdProvider(context.node().get().id())) {
+
+ @Override
+ OffsetCommitRequest.Builder createRequest(int timeoutMs) {
+ List<OffsetCommitRequestTopic> topics = new ArrayList<>();
+ Map<String, List<OffsetCommitRequestPartition>> offsetData =
new HashMap<>();
+ for (Map.Entry<TopicPartition, OffsetAndMetadata> entry :
offsets.entrySet()) {
+ String topic = entry.getKey().topic();
+ OffsetAndMetadata oam = entry.getValue();
+ offsetData.compute(topic, (key, value) -> {
+ if (value == null) {
+ value = new ArrayList<>();
+ }
+ OffsetCommitRequestPartition partition = new
OffsetCommitRequestPartition()
+ .setCommittedOffset(oam.offset())
+
.setCommittedLeaderEpoch(oam.leaderEpoch().orElse(-1))
+ .setCommittedMetadata(oam.metadata())
+ .setPartitionIndex(entry.getKey().partition());
+ value.add(partition);
+ return value;
+ });
+ }
+ for (Map.Entry<String, List<OffsetCommitRequestPartition>>
entry : offsetData.entrySet()) {
+ OffsetCommitRequestTopic topic = new
OffsetCommitRequestTopic()
+ .setName(entry.getKey())
+ .setPartitions(entry.getValue());
+ topics.add(topic);
+ }
+ OffsetCommitRequestData data = new OffsetCommitRequestData()
+ .setGroupId(context.groupId())
+ .setTopics(topics);
+ return new OffsetCommitRequest.Builder(data);
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ final OffsetCommitResponse response = (OffsetCommitResponse)
abstractResponse;
- context.getFuture().complete(membershipChangeResult);
+ // If coordinator changed since we fetched it, retry
+ if
(ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
+ rescheduleFindCoordinatorTask(context,
+ () -> getAlterConsumerGroupOffsetsCall(context,
offsets));
+ return;
+ }
+
+ // If there is a coordinator error, retry
+ for (OffsetCommitResponseTopic topic :
response.data().topics()) {
+ for (OffsetCommitResponsePartition partition :
topic.partitions()) {
+ Errors error = Errors.forCode(partition.errorCode());
+ if
(ConsumerGroupOperationContext.shouldRefreshCoordinator(error)) {
+ rescheduleFindCoordinatorTask(context,
+ () ->
getAlterConsumerGroupOffsetsCall(context, offsets));
+ return;
+ }
+ }
+ }
+
+ final Map<TopicPartition, Errors> partitions = new HashMap<>();
+ for (OffsetCommitResponseTopic topic :
response.data().topics()) {
+ for (OffsetCommitResponsePartition partition :
topic.partitions()) {
+ TopicPartition tp = new TopicPartition(topic.name(),
partition.partitionIndex());
+ Errors error = Errors.forCode(partition.errorCode());
+ partitions.put(tp, error);
+ }
+ }
+ context.future().complete(partitions);
}
@Override
void handleFailure(Throwable throwable) {
- context.getFuture().completeExceptionally(throwable);
+ context.future().completeExceptionally(throwable);
}
};
}
+
+ @Override
+ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec>
topicPartitionOffsets,
+ ListOffsetsOptions options) {
+
+ // preparing topics list for asking metadata about them
+ final Map<TopicPartition, KafkaFutureImpl<ListOffsetsResultInfo>>
futures = new HashMap<>(topicPartitionOffsets.size());
+ final Set<String> topics = new HashSet<>();
+ for (TopicPartition topicPartition : topicPartitionOffsets.keySet()) {
+ topics.add(topicPartition.topic());
+ futures.put(topicPartition, new KafkaFutureImpl<>());
+ }
+
+ final long nowMetadata = time.milliseconds();
+ final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
+
+ MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions>
context =
+ new MetadataOperationContext<>(topics, options, deadline,
futures);
+
+ Call metadataCall = getMetadataCall(context,
+ () -> KafkaAdminClient.this.getListOffsetsCalls(context,
topicPartitionOffsets, futures));
+ runnable.call(metadataCall, nowMetadata);
+
+ return new ListOffsetsResult(new HashMap<>(futures));
+ }
+
+ private List<Call>
getListOffsetsCalls(MetadataOperationContext<ListOffsetsResultInfo,
ListOffsetsOptions> context,
+ Map<TopicPartition, OffsetSpec>
topicPartitionOffsets,
+ Map<TopicPartition,
KafkaFutureImpl<ListOffsetsResultInfo>> futures) {
+
+ MetadataResponse mr = context.response().orElseThrow(() -> new
IllegalStateException("No Metadata response"));
+ List<Call> calls = new ArrayList<>();
+ // grouping topic partitions per leader
+ Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>>
leaders = new HashMap<>();
+
+ for (Map.Entry<TopicPartition, OffsetSpec> entry:
topicPartitionOffsets.entrySet()) {
+
+ OffsetSpec offsetSpec = entry.getValue();
+ TopicPartition tp = entry.getKey();
+ KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
+ long offsetQuery = (offsetSpec instanceof TimestampSpec)
+ ? ((TimestampSpec) offsetSpec).timestamp()
+ : (offsetSpec instanceof OffsetSpec.EarliestSpec)
+ ? ListOffsetRequest.EARLIEST_TIMESTAMP
+ : ListOffsetRequest.LATEST_TIMESTAMP;
+ // avoid sending listOffsets request for topics with errors
+ if (!mr.errors().containsKey(tp.topic())) {
+ Node node = mr.cluster().leaderFor(tp);
+ if (node != null) {
+ Map<TopicPartition, ListOffsetRequest.PartitionData>
leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<>());
+ leadersOnNode.put(tp, new
ListOffsetRequest.PartitionData(offsetQuery, Optional.empty()));
+ } else {
+
future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception());
+ }
+ } else {
+
future.completeExceptionally(mr.errors().get(tp.topic()).exception());
+ }
+ }
+
+ for (final Map.Entry<Node, Map<TopicPartition,
ListOffsetRequest.PartitionData>> entry: leaders.entrySet()) {
+ final int brokerId = entry.getKey().id();
+ final Map<TopicPartition, ListOffsetRequest.PartitionData>
partitionsToQuery = entry.getValue();
+
+ calls.add(new Call("listOffsets on broker " + brokerId,
context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+
+ @Override
+ ListOffsetRequest.Builder createRequest(int timeoutMs) {
+ return ListOffsetRequest.Builder
+ .forConsumer(true,
context.options().isolationLevel())
+ .setTargetTimes(partitionsToQuery);
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ ListOffsetResponse response = (ListOffsetResponse)
abstractResponse;
+ Set<TopicPartition> partitionsWithErrors = new HashSet<>();
+
+ for (Entry<TopicPartition, PartitionData> result :
response.responseData().entrySet()) {
+ TopicPartition tp = result.getKey();
+ PartitionData partitionData = result.getValue();
+
+ KafkaFutureImpl<ListOffsetsResultInfo> future =
futures.get(tp);
+ Errors error = partitionData.error;
+ if
(MetadataOperationContext.shouldRefreshMetadata(error)) {
+ partitionsWithErrors.add(tp);
+ } else if (error == Errors.NONE) {
+ future.complete(new
ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp,
partitionData.leaderEpoch));
+ } else {
+ future.completeExceptionally(error.exception());
+ }
+ }
+
+ if (!partitionsWithErrors.isEmpty()) {
+
partitionsToQuery.keySet().retainAll(partitionsWithErrors);
+ Set<String> retryTopics =
partitionsWithErrors.stream().map(tp -> tp.topic()).collect(Collectors.toSet());
+ MetadataOperationContext<ListOffsetsResultInfo,
ListOffsetsOptions> retryContext =
+ new MetadataOperationContext<>(retryTopics,
context.options(), context.deadline(), futures);
+ rescheduleMetadataTask(retryContext, () ->
Collections.singletonList(this));
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ for (TopicPartition tp : entry.getValue().keySet()) {
+ KafkaFutureImpl<ListOffsetsResultInfo> future =
futures.get(tp);
+ future.completeExceptionally(throwable);
+ }
+ }
+ });
+ }
+ return calls;
+ }
+
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsOptions.java
similarity index 53%
copy from
clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
copy to
clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsOptions.java
index a09b625..0e116e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsOptions.java
@@ -14,29 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.requests;
+package org.apache.kafka.clients.admin;
-public enum IsolationLevel {
- READ_UNCOMMITTED((byte) 0), READ_COMMITTED((byte) 1);
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.annotation.InterfaceStability;
- private final byte id;
+/**
+ * Options for {@link AdminClient#listOffsets(Map)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
[email protected]
+public class ListOffsetsOptions extends AbstractOptions<ListOffsetsOptions> {
+
+ private final IsolationLevel isolationLevel;
- IsolationLevel(byte id) {
- this.id = id;
+ public ListOffsetsOptions() {
+ this(IsolationLevel.READ_UNCOMMITTED);
}
- public byte id() {
- return id;
+ public ListOffsetsOptions(IsolationLevel isolationLevel) {
+ this.isolationLevel = isolationLevel;
}
- public static IsolationLevel forId(byte id) {
- switch (id) {
- case 0:
- return READ_UNCOMMITTED;
- case 1:
- return READ_COMMITTED;
- default:
- throw new IllegalArgumentException("Unknown isolation level "
+ id);
- }
+ public IsolationLevel isolationLevel() {
+ return isolationLevel;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java
new file mode 100644
index 0000000..d830ef2
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * The result of the {@link AdminClient#listOffsets(Map)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
[email protected]
+public class ListOffsetsResult {
+
+ private final Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>>
futures;
+
+ ListOffsetsResult(Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>>
futures) {
+ this.futures = futures;
+ }
+
+ /**
+ * Return a future which can be used to check the result for a given
partition.
+ */
+ public KafkaFuture<ListOffsetsResultInfo> partitionResult(final
TopicPartition partition) {
+ KafkaFuture<ListOffsetsResultInfo> future = futures.get(partition);
+ if (future == null) {
+ throw new IllegalArgumentException(
+ "List Offsets for partition \"" + partition + "\" was not
attempted");
+ }
+ return future;
+ }
+
+ /**
+ * Return a future which succeeds only if offsets for all specified
partitions have been successfully
+ * retrieved.
+ */
+ public KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]))
+ .thenApply(new KafkaFuture.BaseFunction<Void,
Map<TopicPartition, ListOffsetsResultInfo>>() {
+ @Override
+ public Map<TopicPartition, ListOffsetsResultInfo>
apply(Void v) {
+ Map<TopicPartition, ListOffsetsResultInfo> offsets =
new HashMap<>(futures.size());
+ for (Map.Entry<TopicPartition,
KafkaFuture<ListOffsetsResultInfo>> entry : futures.entrySet()) {
+ try {
+ offsets.put(entry.getKey(),
entry.getValue().get());
+ } catch (InterruptedException | ExecutionException
e) {
+ // This should be unreachable, because allOf
ensured that all the futures completed successfully.
+ throw new RuntimeException(e);
+ }
+ }
+ return offsets;
+ }
+ });
+ }
+
+ public static class ListOffsetsResultInfo {
+
+ private final long offset;
+ private final long timestamp;
+ private final Optional<Integer> leaderEpoch;
+
+ ListOffsetsResultInfo(long offset, long timestamp, Optional<Integer>
leaderEpoch) {
+ this.offset = offset;
+ this.timestamp = timestamp;
+ this.leaderEpoch = leaderEpoch;
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ public long timestamp() {
+ return timestamp;
+ }
+
+ public Optional<Integer> leaderEpoch() {
+ return leaderEpoch;
+ }
+
+ @Override
+ public String toString() {
+ return "ListOffsetsResultInfo(offset=" + offset + ", timestamp=" +
timestamp + ", leaderEpoch="
+ + leaderEpoch + ")";
+ }
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
new file mode 100644
index 0000000..8955b41
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+
+/**
+ * This class allows to specify the desired offsets when using {@link
KafkaAdminClient#listOffsets(Map, ListOffsetsOptions)}
+ */
+public class OffsetSpec {
+
+ static class EarliestSpec extends OffsetSpec { }
+ static class LatestSpec extends OffsetSpec { }
+ static class TimestampSpec extends OffsetSpec {
+ private final long timestamp;
+
+ TimestampSpec(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ long timestamp() {
+ return timestamp;
+ }
+ }
+
+ /**
+ * Used to retrieve the latest offset of a partition
+ */
+ public static OffsetSpec latest() {
+ return new LatestSpec();
+ }
+
+ /**
+ * Used to retrieve the earliest offset of a partition
+ */
+ public static OffsetSpec earliest() {
+ return new EarliestSpec();
+ }
+
+ /**
+ * Used to retrieve the the earliest offset whose timestamp is greater than
+ * or equal to the given timestamp in the corresponding partition
+ * @param timestamp in milliseconds
+ */
+ public static OffsetSpec forTimestamp(long timestamp) {
+ return new TimestampSpec(timestamp);
+ }
+
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ConsumerGroupOperationContext.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ConsumerGroupOperationContext.java
new file mode 100644
index 0000000..bd4415c
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ConsumerGroupOperationContext.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.Optional;
+
+import org.apache.kafka.clients.admin.AbstractOptions;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+
+/**
+ * Context class to encapsulate parameters of a call to find and use a
consumer group coordinator.
+ * Some of the parameters are provided at construction and are immutable
whereas others are provided
+ * as "Call" are completed and values are available, like node id of the
coordinator.
+ *
+ * @param <T> The type of return value of the KafkaFuture
+ * @param <O> The type of configuration option. Different for different
consumer group commands.
+ */
+public final class ConsumerGroupOperationContext<T, O extends
AbstractOptions<O>> {
+ final private String groupId;
+ final private O options;
+ final private long deadline;
+ final private KafkaFutureImpl<T> future;
+ private Optional<Node> node;
+
+ public ConsumerGroupOperationContext(String groupId,
+ O options,
+ long deadline,
+ KafkaFutureImpl<T> future) {
+ this.groupId = groupId;
+ this.options = options;
+ this.deadline = deadline;
+ this.future = future;
+ this.node = Optional.empty();
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+ public O options() {
+ return options;
+ }
+
+ public long deadline() {
+ return deadline;
+ }
+
+ public KafkaFutureImpl<T> future() {
+ return future;
+ }
+
+ public Optional<Node> node() {
+ return node;
+ }
+
+ public void setNode(Node node) {
+ this.node = Optional.ofNullable(node);
+ }
+
+ public static boolean hasCoordinatorMoved(AbstractResponse response) {
+ return response.errorCounts().keySet()
+ .stream()
+ .anyMatch(error -> error == Errors.NOT_COORDINATOR);
+ }
+
+ public static boolean shouldRefreshCoordinator(Errors error) {
+ return error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error ==
Errors.COORDINATOR_NOT_AVAILABLE;
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
new file mode 100644
index 0000000..e6f4054
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.kafka.clients.admin.AbstractOptions;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
+import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
+
+/**
+ * Context class to encapsulate parameters of a call to fetch and use cluster
metadata.
+ * Some of the parameters are provided at construction and are immutable
whereas others are provided
+ * as "Call" are completed and values are available.
+ *
+ * @param <T> The type of return value of the KafkaFuture
+ * @param <O> The type of configuration option.
+ */
+public final class MetadataOperationContext<T, O extends AbstractOptions<O>> {
+ final private Collection<String> topics;
+ final private O options;
+ final private long deadline;
+ final private Map<TopicPartition, KafkaFutureImpl<T>> futures;
+ private Optional<MetadataResponse> response;
+
+ public MetadataOperationContext(Collection<String> topics,
+ O options,
+ long deadline,
+ Map<TopicPartition, KafkaFutureImpl<T>>
futures) {
+ this.topics = topics;
+ this.options = options;
+ this.deadline = deadline;
+ this.futures = futures;
+ this.response = Optional.empty();
+ }
+
+ public void setResponse(Optional<MetadataResponse> response) {
+ this.response = response;
+ }
+
+ public Optional<MetadataResponse> response() {
+ return response;
+ }
+
+ public O options() {
+ return options;
+ }
+
+ public long deadline() {
+ return deadline;
+ }
+
+ public Map<TopicPartition, KafkaFutureImpl<T>> futures() {
+ return futures;
+ }
+
+ public Collection<String> topics() {
+ return topics;
+ }
+
+ public static void handleMetadataErrors(MetadataResponse response) {
+ for (TopicMetadata tm : response.topicMetadata()) {
+ for (PartitionMetadata pm : tm.partitionMetadata()) {
+ if (shouldRefreshMetadata(pm.error())) {
+ throw pm.error().exception();
+ }
+ }
+ }
+ }
+
+ public static boolean shouldRefreshMetadata(Errors error) {
+ return error.exception() instanceof InvalidMetadataException;
+ }
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index e0f7c4d..67897cf 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -18,13 +18,13 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Collections;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f12beaf..e9e91bc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -35,6 +35,7 @@ import
org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
import
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@@ -52,7 +53,6 @@ import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
-import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 384da31..a162ddb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
@@ -64,7 +65,6 @@ import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 953505f..bd05909 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -22,11 +22,11 @@ import
org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.PartitionStates;
import org.apache.kafka.common.requests.EpochEndOffset;
-import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
b/clients/src/main/java/org/apache/kafka/common/IsolationLevel.java
similarity index 96%
rename from
clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
rename to clients/src/main/java/org/apache/kafka/common/IsolationLevel.java
index a09b625..79f0a92 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/IsolationLevel.java
+++ b/clients/src/main/java/org/apache/kafka/common/IsolationLevel.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.requests;
+package org.apache.kafka.common;
public enum IsolationLevel {
READ_UNCOMMITTED((byte) 0), READ_COMMITTED((byte) 1);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
index 963ad06..3fbf13d 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
@@ -102,7 +102,7 @@ public class AlterConfigsRequest extends AbstractRequest {
}
- public static class Builder extends AbstractRequest.Builder {
+ public static class Builder extends
AbstractRequest.Builder<AlterConfigsRequest> {
private final Map<ConfigResource, Config> configs;
private final boolean validateOnly;
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
index 0ee256f..99bbdd7 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
@@ -65,7 +65,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0,
DESCRIBE_CONFIGS_REQUEST_V1, DESCRIBE_CONFIGS_REQUEST_V2};
}
- public static class Builder extends AbstractRequest.Builder {
+ public static class Builder extends
AbstractRequest.Builder<DescribeConfigsRequest> {
private final Map<ConfigResource, Collection<String>>
resourceToConfigNames;
private boolean includeSynonyms;
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 2c0455a..f99409e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index e9fe942..094d0d2 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -217,7 +218,6 @@ public class ListOffsetRequest extends AbstractRequest {
/**
* Private constructor with a specified version.
*/
- @SuppressWarnings("unchecked")
private ListOffsetRequest(int replicaId,
Map<TopicPartition, PartitionData> targetTimes,
IsolationLevel isolationLevel,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 6d870fc..4d677bf 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
@@ -102,9 +103,14 @@ import
org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData;
+import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
+import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetDeleteResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.resource.PatternType;
@@ -276,6 +282,12 @@ public class KafkaAdminClientTest {
);
}
+ private static OffsetCommitResponse
prepareOffsetCommitResponse(TopicPartition tp, Errors error) {
+ Map<TopicPartition, Errors> responseData = new HashMap<>();
+ responseData.put(tp, error);
+ return new OffsetCommitResponse(0, responseData);
+ }
+
private static CreateTopicsResponse prepareCreateTopicsResponse(String
topicName, Errors error) {
CreateTopicsResponseData data = new CreateTopicsResponseData();
data.topics().add(new CreatableTopicResult().
@@ -295,6 +307,31 @@ public class KafkaAdminClientTest {
return FindCoordinatorResponse.prepareResponse(error, node);
}
+ private static MetadataResponse prepareMetadataResponse(Cluster cluster,
Errors error) {
+ List<TopicMetadata> metadata = new ArrayList<>();
+ for (String topic : cluster.topics()) {
+ List<PartitionMetadata> pms = new ArrayList<>();
+ for (PartitionInfo pInfo :
cluster.availablePartitionsForTopic(topic)) {
+ PartitionMetadata pm = new PartitionMetadata(error,
+ pInfo.partition(),
+ pInfo.leader(),
+ Optional.of(234),
+ Arrays.asList(pInfo.replicas()),
+ Arrays.asList(pInfo.inSyncReplicas()),
+ Arrays.asList(pInfo.offlineReplicas()));
+ pms.add(pm);
+ }
+ TopicMetadata tm = new TopicMetadata(error, topic, false, pms);
+ metadata.add(tm);
+ }
+ return MetadataResponse.prepareResponse(0,
+ cluster.nodes(),
+ cluster.clusterResource().clusterId(),
+ cluster.controller().id(),
+ metadata,
+ MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
+ }
+
/**
* Test that the client properly times out when we don't receive any
metadata.
*/
@@ -1451,7 +1488,8 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-
env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
env.cluster().controller()));
+ env.kafkaClient().prepareResponse(
+ prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
DescribeGroupsResponseData data = new DescribeGroupsResponseData();
data.groups().add(DescribeGroupsResponse.groupMetadata(
@@ -1567,13 +1605,15 @@ public class KafkaAdminClientTest {
assertNull(results.get());
//should throw error for non-retriable errors
-
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,
Node.noNode()));
+ env.kafkaClient().prepareResponse(
+
prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,
Node.noNode()));
final DeleteConsumerGroupsResult errorResult =
env.adminClient().deleteConsumerGroups(groupIds);
TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"),
GroupAuthorizationException.class);
//Retriable errors should be retried
-
env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
env.cluster().controller()));
+ env.kafkaClient().prepareResponse(
+ prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
final DeletableGroupResultCollection errorResponse1 = new
DeletableGroupResultCollection();
errorResponse1.add(new DeletableGroupResult()
@@ -1698,7 +1738,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(
- FindCoordinatorResponse.prepareResponse(Errors.NONE,
env.cluster().controller()));
+ prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
env.kafkaClient().prepareResponse(
prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE));
@@ -1752,8 +1792,8 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
for (Errors error : retriableErrors) {
- env.kafkaClient().prepareResponse(FindCoordinatorResponse
- .prepareResponse(Errors.NONE, env.cluster().controller()));
+ env.kafkaClient().prepareResponse(
+ prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
env.kafkaClient().prepareResponse(
prepareOffsetDeleteResponse(error));
@@ -2022,8 +2062,8 @@ public class KafkaAdminClientTest {
.setPartitions(Collections.singletonList(normalPartitionResponse))));
env.kafkaClient().prepareResponse(new
AlterPartitionReassignmentsResponse(responseData1));
AlterPartitionReassignmentsResult result1 =
env.adminClient().alterPartitionReassignments(reassignments);
- Future future1 = result1.all();
- Future future2 = result1.values().get(tp1);
+ Future<Void> future1 = result1.all();
+ Future<Void> future2 = result1.values().get(tp1);
TestUtils.assertFutureError(future1, UnknownServerException.class);
TestUtils.assertFutureError(future2, UnknownServerException.class);
@@ -2208,6 +2248,466 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testAlterConsumerGroupOffsets() throws Exception {
+ // Happy path
+
+ final Map<Integer, Node> nodes = new HashMap<>();
+ nodes.put(0, new Node(0, "localhost", 8121));
+
+ final Cluster cluster =
+ new Cluster(
+ "mockClusterId",
+ nodes.values(),
+ Collections.<PartitionInfo>emptyList(),
+ Collections.<String>emptySet(),
+ Collections.<String>emptySet(), nodes.get(0));
+
+ final String groupId = "group-0";
+ final TopicPartition tp1 = new TopicPartition("foo", 0);
+ final TopicPartition tp2 = new TopicPartition("bar", 0);
+ final TopicPartition tp3 = new TopicPartition("foobar", 0);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(
+ prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ Map<TopicPartition, Errors> responseData = new HashMap<>();
+ responseData.put(tp1, Errors.NONE);
+ responseData.put(tp2, Errors.NONE);
+ env.kafkaClient().prepareResponse(new OffsetCommitResponse(0,
responseData));
+
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp1, new OffsetAndMetadata(123L));
+ offsets.put(tp2, new OffsetAndMetadata(456L));
+ final AlterConsumerGroupOffsetsResult result =
env.adminClient().alterConsumerGroupOffsets(
+ groupId, offsets);
+
+ assertNull(result.all().get());
+ assertNull(result.partitionResult(tp1).get());
+ assertNull(result.partitionResult(tp2).get());
+ TestUtils.assertFutureError(result.partitionResult(tp3),
IllegalArgumentException.class);
+ }
+ }
+
+ @Test
+ public void testAlterConsumerGroupOffsetsRetriableErrors() throws
Exception {
+ // Retriable errors should be retried
+
+ final Map<Integer, Node> nodes = new HashMap<>();
+ nodes.put(0, new Node(0, "localhost", 8121));
+
+ final Cluster cluster =
+ new Cluster(
+ "mockClusterId",
+ nodes.values(),
+ Collections.<PartitionInfo>emptyList(),
+ Collections.<String>emptySet(),
+ Collections.<String>emptySet(), nodes.get(0));
+
+ final String groupId = "group-0";
+ final TopicPartition tp1 = new TopicPartition("foo", 0);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(
+ prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ env.kafkaClient().prepareResponse(
+ prepareOffsetCommitResponse(tp1,
Errors.COORDINATOR_NOT_AVAILABLE));
+
+ env.kafkaClient().prepareResponse(
+ prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ env.kafkaClient().prepareResponse(
+ prepareOffsetCommitResponse(tp1,
Errors.COORDINATOR_LOAD_IN_PROGRESS));
+
+ env.kafkaClient().prepareResponse(
+ prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ env.kafkaClient().prepareResponse(
+ prepareOffsetCommitResponse(tp1, Errors.NOT_COORDINATOR));
+
+ env.kafkaClient().prepareResponse(
+ prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ env.kafkaClient().prepareResponse(
+ prepareOffsetCommitResponse(tp1, Errors.NONE));
+
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp1, new OffsetAndMetadata(123L));
+ final AlterConsumerGroupOffsetsResult result1 = env.adminClient()
+ .alterConsumerGroupOffsets(groupId, offsets);
+
+ assertNull(result1.all().get());
+ assertNull(result1.partitionResult(tp1).get());
+ }
+ }
+
+ @Test
+ public void testAlterConsumerGroupOffsetsNonRetriableErrors() throws
Exception {
+ // Non-retriable errors throw an exception
+
+ final Map<Integer, Node> nodes = new HashMap<>();
+ nodes.put(0, new Node(0, "localhost", 8121));
+
+ final Cluster cluster =
+ new Cluster(
+ "mockClusterId",
+ nodes.values(),
+ Collections.<PartitionInfo>emptyList(),
+ Collections.<String>emptySet(),
+ Collections.<String>emptySet(), nodes.get(0));
+
+ final String groupId = "group-0";
+ final TopicPartition tp1 = new TopicPartition("foo", 0);
+ final List<Errors> nonRetriableErrors = Arrays.asList(
+ Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID,
Errors.GROUP_ID_NOT_FOUND);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ for (Errors error : nonRetriableErrors) {
+ env.kafkaClient().prepareResponse(
+ prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+
env.kafkaClient().prepareResponse(prepareOffsetCommitResponse(tp1, error));
+
+ Map<TopicPartition, OffsetAndMetadata> offsets = new
HashMap<>();
+ offsets.put(tp1, new OffsetAndMetadata(123L));
+ AlterConsumerGroupOffsetsResult errorResult = env.adminClient()
+ .alterConsumerGroupOffsets(groupId, offsets);
+
+ TestUtils.assertFutureError(errorResult.all(),
error.exception().getClass());
+ TestUtils.assertFutureError(errorResult.partitionResult(tp1),
error.exception().getClass());
+ }
+ }
+ }
+
+ @Test
+ public void testAlterConsumerGroupOffsetsFindCoordinatorRetriableErrors()
throws Exception {
+ // Retriable FindCoordinatorResponse errors should be retried
+
+ final Map<Integer, Node> nodes = new HashMap<>();
+ nodes.put(0, new Node(0, "localhost", 8121));
+
+ final Cluster cluster =
+ new Cluster(
+ "mockClusterId",
+ nodes.values(),
+ Collections.<PartitionInfo>emptyList(),
+ Collections.<String>emptySet(),
+ Collections.<String>emptySet(), nodes.get(0));
+
+ final String groupId = "group-0";
+ final TopicPartition tp1 = new TopicPartition("foo", 0);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(
+
prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
Node.noNode()));
+ env.kafkaClient().prepareResponse(
+
prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
Node.noNode()));
+
+ env.kafkaClient().prepareResponse(
+ prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ env.kafkaClient().prepareResponse(
+ prepareOffsetCommitResponse(tp1, Errors.NONE));
+
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp1, new OffsetAndMetadata(123L));
+ final AlterConsumerGroupOffsetsResult result = env.adminClient()
+ .alterConsumerGroupOffsets(groupId, offsets);
+
+ assertNull(result.all().get());
+ assertNull(result.partitionResult(tp1).get());
+ }
+ }
+
+ @Test
+ public void
testAlterConsumerGroupOffsetsFindCoordinatorNonRetriableErrors() throws
Exception {
+ // Non-retriable FindCoordinatorResponse errors throw an exception
+
+ final Map<Integer, Node> nodes = new HashMap<>();
+ nodes.put(0, new Node(0, "localhost", 8121));
+
+ final Cluster cluster =
+ new Cluster(
+ "mockClusterId",
+ nodes.values(),
+ Collections.<PartitionInfo>emptyList(),
+ Collections.<String>emptySet(),
+ Collections.<String>emptySet(), nodes.get(0));
+
+ final String groupId = "group-0";
+ final TopicPartition tp1 = new TopicPartition("foo", 0);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(
+
prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,
Node.noNode()));
+
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp1, new OffsetAndMetadata(123L));
+ final AlterConsumerGroupOffsetsResult errorResult =
env.adminClient()
+ .alterConsumerGroupOffsets(groupId, offsets);
+
+ TestUtils.assertFutureError(errorResult.all(),
GroupAuthorizationException.class);
+ TestUtils.assertFutureError(errorResult.partitionResult(tp1),
GroupAuthorizationException.class);
+ }
+ }
+
+ @Test
+ public void testListOffsets() throws Exception {
+ // Happy path
+
+ Node node0 = new Node(0, "localhost", 8120);
+ List<PartitionInfo> pInfos = new ArrayList<>();
+ pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new
Node[]{node0}));
+ pInfos.add(new PartitionInfo("bar", 0, node0, new Node[]{node0}, new
Node[]{node0}));
+ pInfos.add(new PartitionInfo("baz", 0, node0, new Node[]{node0}, new
Node[]{node0}));
+ final Cluster cluster =
+ new Cluster(
+ "mockClusterId",
+ Arrays.asList(node0),
+ pInfos,
+ Collections.<String>emptySet(),
+ Collections.<String>emptySet(),
+ node0);
+
+ final TopicPartition tp1 = new TopicPartition("foo", 0);
+ final TopicPartition tp2 = new TopicPartition("bar", 0);
+ final TopicPartition tp3 = new TopicPartition("baz", 0);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.NONE));
+
+ Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+ responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 123L,
Optional.of(321)));
+ responseData.put(tp2, new PartitionData(Errors.NONE, -1L, 234L,
Optional.of(432)));
+ responseData.put(tp3, new PartitionData(Errors.NONE, 123456789L,
345L, Optional.of(543)));
+ env.kafkaClient().prepareResponse(new
ListOffsetResponse(responseData));
+
+ Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
+ partitions.put(tp1, OffsetSpec.latest());
+ partitions.put(tp2, OffsetSpec.earliest());
+ partitions.put(tp3,
OffsetSpec.forTimestamp(System.currentTimeMillis()));
+ ListOffsetsResult result =
env.adminClient().listOffsets(partitions);
+
+ Map<TopicPartition, ListOffsetsResultInfo> offsets =
result.all().get();
+ assertFalse(offsets.isEmpty());
+ assertEquals(123L, offsets.get(tp1).offset());
+ assertEquals(321, offsets.get(tp1).leaderEpoch().get().intValue());
+ assertEquals(-1L, offsets.get(tp1).timestamp());
+ assertEquals(234L, offsets.get(tp2).offset());
+ assertEquals(432, offsets.get(tp2).leaderEpoch().get().intValue());
+ assertEquals(-1L, offsets.get(tp2).timestamp());
+ assertEquals(345L, offsets.get(tp3).offset());
+ assertEquals(543, offsets.get(tp3).leaderEpoch().get().intValue());
+ assertEquals(123456789L, offsets.get(tp3).timestamp());
+ assertEquals(offsets.get(tp1), result.partitionResult(tp1).get());
+ assertEquals(offsets.get(tp2), result.partitionResult(tp2).get());
+ assertEquals(offsets.get(tp3), result.partitionResult(tp3).get());
+ try {
+ result.partitionResult(new TopicPartition("unknown", 0)).get();
+ fail("should have thrown IllegalArgumentException");
+ } catch (IllegalArgumentException expected) { }
+ }
+ }
+
+ @Test
+ public void testListOffsetsRetriableErrors() throws Exception {
+
+ Node node0 = new Node(0, "localhost", 8120);
+ Node node1 = new Node(1, "localhost", 8121);
+ List<Node> nodes = Arrays.asList(node0, node1);
+ List<PartitionInfo> pInfos = new ArrayList<>();
+ pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0,
node1}, new Node[]{node0, node1}));
+ pInfos.add(new PartitionInfo("foo", 1, node0, new Node[]{node0,
node1}, new Node[]{node0, node1}));
+ pInfos.add(new PartitionInfo("bar", 0, node1, new Node[]{node1,
node0}, new Node[]{node1, node0}));
+ final Cluster cluster =
+ new Cluster(
+ "mockClusterId",
+ nodes,
+ pInfos,
+ Collections.<String>emptySet(),
+ Collections.<String>emptySet(),
+ node0);
+
+ final TopicPartition tp1 = new TopicPartition("foo", 0);
+ final TopicPartition tp2 = new TopicPartition("foo", 1);
+ final TopicPartition tp3 = new TopicPartition("bar", 0);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.NONE));
+ // listoffsets response from broker 0
+ Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+ responseData.put(tp1, new
PartitionData(Errors.LEADER_NOT_AVAILABLE, -1L, 123L, Optional.of(321)));
+ responseData.put(tp3, new PartitionData(Errors.NONE, -1L, 987L,
Optional.of(789)));
+ env.kafkaClient().prepareResponse(new
ListOffsetResponse(responseData));
+ // listoffsets response from broker 1
+ responseData = new HashMap<>();
+ responseData.put(tp2, new PartitionData(Errors.NONE, -1L, 456L,
Optional.of(654)));
+ env.kafkaClient().prepareResponse(new
ListOffsetResponse(responseData));
+
+ // metadata refresh because of LEADER_NOT_AVAILABLE
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.NONE));
+ // listoffsets response from broker 0
+ responseData = new HashMap<>();
+ responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 345L,
Optional.of(543)));
+ env.kafkaClient().prepareResponse(new
ListOffsetResponse(responseData));
+
+ Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
+ partitions.put(tp1, OffsetSpec.latest());
+ partitions.put(tp2, OffsetSpec.latest());
+ partitions.put(tp3, OffsetSpec.latest());
+ ListOffsetsResult result =
env.adminClient().listOffsets(partitions);
+
+ Map<TopicPartition, ListOffsetsResultInfo> offsets =
result.all().get();
+ assertFalse(offsets.isEmpty());
+ assertEquals(345L, offsets.get(tp1).offset());
+ assertEquals(543, offsets.get(tp1).leaderEpoch().get().intValue());
+ assertEquals(-1L, offsets.get(tp1).timestamp());
+ assertEquals(456, offsets.get(tp2).offset());
+ assertEquals(654, offsets.get(tp2).leaderEpoch().get().intValue());
+ assertEquals(-1L, offsets.get(tp2).timestamp());
+ assertEquals(987, offsets.get(tp3).offset());
+ assertEquals(789, offsets.get(tp3).leaderEpoch().get().intValue());
+ assertEquals(-1L, offsets.get(tp3).timestamp());
+ }
+ }
+
+ @Test
+ public void testListOffsetsNonRetriableErrors() throws Exception {
+
+ Node node0 = new Node(0, "localhost", 8120);
+ Node node1 = new Node(1, "localhost", 8121);
+ List<Node> nodes = Arrays.asList(node0, node1);
+ List<PartitionInfo> pInfos = new ArrayList<>();
+ pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0,
node1}, new Node[]{node0, node1}));
+ final Cluster cluster =
+ new Cluster(
+ "mockClusterId",
+ nodes,
+ pInfos,
+ Collections.<String>emptySet(),
+ Collections.<String>emptySet(),
+ node0);
+
+ final TopicPartition tp1 = new TopicPartition("foo", 0);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.NONE));
+
+ Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+ responseData.put(tp1, new
PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, -1L, -1, Optional.empty()));
+ env.kafkaClient().prepareResponse(new
ListOffsetResponse(responseData));
+
+ Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
+ partitions.put(tp1, OffsetSpec.latest());
+ ListOffsetsResult result =
env.adminClient().listOffsets(partitions);
+
+ TestUtils.assertFutureError(result.all(),
TopicAuthorizationException.class);
+ }
+ }
+
+ @Test
+ public void testListOffsetsMetadataRetriableErrors() throws Exception {
+
+ Node node0 = new Node(0, "localhost", 8120);
+ Node node1 = new Node(1, "localhost", 8121);
+ List<Node> nodes = Arrays.asList(node0, node1);
+ List<PartitionInfo> pInfos = new ArrayList<>();
+ pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new
Node[]{node0}));
+ pInfos.add(new PartitionInfo("foo", 1, node1, new Node[]{node1}, new
Node[]{node1}));
+ final Cluster cluster =
+ new Cluster(
+ "mockClusterId",
+ nodes,
+ pInfos,
+ Collections.<String>emptySet(),
+ Collections.<String>emptySet(),
+ node0);
+
+ final TopicPartition tp0 = new TopicPartition("foo", 0);
+ final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.LEADER_NOT_AVAILABLE));
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.UNKNOWN_TOPIC_OR_PARTITION));
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.NONE));
+
+ // listoffsets response from broker 0
+ Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+ responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 345L,
Optional.of(543)));
+ env.kafkaClient().prepareResponse(new
ListOffsetResponse(responseData));
+ // listoffsets response from broker 1
+ responseData = new HashMap<>();
+ responseData.put(tp1, new PartitionData(Errors.NONE, -1L, 789L,
Optional.of(987)));
+ env.kafkaClient().prepareResponse(new
ListOffsetResponse(responseData));
+
+ Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
+ partitions.put(tp0, OffsetSpec.latest());
+ partitions.put(tp1, OffsetSpec.latest());
+ ListOffsetsResult result =
env.adminClient().listOffsets(partitions);
+
+ Map<TopicPartition, ListOffsetsResultInfo> offsets =
result.all().get();
+ assertFalse(offsets.isEmpty());
+ assertEquals(345L, offsets.get(tp0).offset());
+ assertEquals(543, offsets.get(tp0).leaderEpoch().get().intValue());
+ assertEquals(-1L, offsets.get(tp0).timestamp());
+ assertEquals(789L, offsets.get(tp1).offset());
+ assertEquals(987, offsets.get(tp1).leaderEpoch().get().intValue());
+ assertEquals(-1L, offsets.get(tp1).timestamp());
+ }
+ }
+
+ @Test
+ public void testListOffsetsMetadataNonRetriableErrors() throws Exception {
+
+ Node node0 = new Node(0, "localhost", 8120);
+ Node node1 = new Node(1, "localhost", 8121);
+ List<Node> nodes = Arrays.asList(node0, node1);
+ List<PartitionInfo> pInfos = new ArrayList<>();
+ pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0,
node1}, new Node[]{node0, node1}));
+ final Cluster cluster =
+ new Cluster(
+ "mockClusterId",
+ nodes,
+ pInfos,
+ Collections.<String>emptySet(),
+ Collections.<String>emptySet(),
+ node0);
+
+ final TopicPartition tp1 = new TopicPartition("foo", 0);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.TOPIC_AUTHORIZATION_FAILED));
+
+ Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
+ partitions.put(tp1, OffsetSpec.latest());
+ ListOffsetsResult result =
env.adminClient().listOffsets(partitions);
+
+ TestUtils.assertFutureError(result.all(),
TopicAuthorizationException.class);
+ }
+ }
+
private static MemberDescription
convertToMemberDescriptions(DescribedGroupMember member,
MemberAssignment assignment) {
return new MemberDescription(member.memberId(),
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 017e447..3ec3b30 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.TopicPartitionReplica;
@@ -443,6 +444,16 @@ public class MockAdminClient extends AdminClient {
}
@Override
+ public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String
groupId, Map<TopicPartition, OffsetAndMetadata> offsets,
AlterConsumerGroupOffsetsOptions options) {
+ throw new UnsupportedOperationException("Not implement yet");
+ }
+
+ @Override
+ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec>
topicPartitionOffsets, ListOffsetsOptions options) {
+ throw new UnsupportedOperationException("Not implement yet");
+ }
+
+ @Override
public void close(Duration timeout) {}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 68fac2b..79477b7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -30,6 +30,7 @@ import
org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
@@ -61,7 +62,6 @@ import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
-import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupResponse;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index fd1be37..ae02241 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
@@ -70,7 +71,6 @@ import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c8bd751..c8563e7 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 2d17659..139335a 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -27,9 +27,8 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import kafka.utils._
import org.apache.kafka.clients.admin._
-import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer,
OffsetAndMetadata}
+import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.{CommonClientConfigs, admin}
-import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
@@ -42,6 +41,7 @@ import org.apache.kafka.common.protocol.Errors
import scala.collection.immutable.TreeMap
import scala.reflect.ClassTag
+import org.apache.kafka.common.requests.ListOffsetResponse
object ConsumerGroupCommand extends Logging {
@@ -169,9 +169,6 @@ object ConsumerGroupCommand extends Logging {
private val adminClient = createAdminClient(configOverrides)
- // `consumers` are only needed for `describe`, so we instantiate them
lazily
- private lazy val consumers: mutable.Map[String, KafkaConsumer[String,
String]] = mutable.Map.empty
-
// We have to make sure it is evaluated once and available
private lazy val resetPlanFromFile: Option[Map[String, Map[TopicPartition,
OffsetAndMetadata]]] = {
if (opts.options.has(opts.resetFromFileOpt)) {
@@ -390,8 +387,13 @@ object ConsumerGroupCommand extends Logging {
// Dry-run is the default behavior if --execute is not
specified
val dryRun = opts.options.has(opts.dryRunOpt) ||
!opts.options.has(opts.executeOpt)
- if (!dryRun)
- getConsumer(groupId).commitSync(preparedOffsets.asJava)
+ if (!dryRun) {
+ adminClient.alterConsumerGroupOffsets(
+ groupId,
+ preparedOffsets.asJava,
+ withTimeoutMs(new AlterConsumerGroupOffsetsOptions)
+ ).all.get
+ }
acc.updated(groupId, preparedOffsets)
case currentState =>
printError(s"Assignments can only be reset if the group
'$groupId' is inactive, but the current state is $currentState.")
@@ -574,34 +576,50 @@ object ConsumerGroupCommand extends Logging {
}
private def getLogEndOffsets(groupId: String, topicPartitions:
Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = {
- val offsets = getConsumer(groupId).endOffsets(topicPartitions.asJava)
+ val endOffsets = topicPartitions.map { topicPartition =>
+ topicPartition -> OffsetSpec.latest
+ }.toMap
+ val offsets = adminClient.listOffsets(
+ endOffsets.asJava,
+ withTimeoutMs(new ListOffsetsOptions)
+ ).all.get
topicPartitions.map { topicPartition =>
Option(offsets.get(topicPartition)) match {
- case Some(logEndOffset) => topicPartition ->
LogOffsetResult.LogOffset(logEndOffset)
+ case Some(listOffsetsResultInfo) => topicPartition ->
LogOffsetResult.LogOffset(listOffsetsResultInfo.offset)
case _ => topicPartition -> LogOffsetResult.Unknown
}
}.toMap
}
private def getLogStartOffsets(groupId: String, topicPartitions:
Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = {
- val offsets =
getConsumer(groupId).beginningOffsets(topicPartitions.asJava)
+ val startOffsets = topicPartitions.map { topicPartition =>
+ topicPartition -> OffsetSpec.earliest
+ }.toMap
+ val offsets = adminClient.listOffsets(
+ startOffsets.asJava,
+ withTimeoutMs(new ListOffsetsOptions)
+ ).all.get
topicPartitions.map { topicPartition =>
Option(offsets.get(topicPartition)) match {
- case Some(logStartOffset) => topicPartition ->
LogOffsetResult.LogOffset(logStartOffset)
+ case Some(listOffsetsResultInfo) => topicPartition ->
LogOffsetResult.LogOffset(listOffsetsResultInfo.offset)
case _ => topicPartition -> LogOffsetResult.Unknown
}
}.toMap
}
private def getLogTimestampOffsets(groupId: String, topicPartitions:
Seq[TopicPartition], timestamp: java.lang.Long): Map[TopicPartition,
LogOffsetResult] = {
- val consumer = getConsumer(groupId)
- consumer.assign(topicPartitions.asJava)
-
+ val timestampOffsets = topicPartitions.map { topicPartition =>
+ topicPartition -> OffsetSpec.forTimestamp(timestamp)
+ }.toMap
+ val offsets = adminClient.listOffsets(
+ timestampOffsets.asJava,
+ withTimeoutMs(new ListOffsetsOptions)
+ ).all.get
val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =
- consumer.offsetsForTimes(topicPartitions.map(_ ->
timestamp).toMap.asJava).asScala.partition(_._2 != null)
+ offsets.asScala.partition(_._2.offset !=
ListOffsetResponse.UNKNOWN_OFFSET)
val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {
- case (topicPartition, offsetAndTimestamp) => topicPartition ->
LogOffsetResult.LogOffset(offsetAndTimestamp.offset)
+ case (topicPartition, listOffsetsResultInfo) => topicPartition ->
LogOffsetResult.LogOffset(listOffsetsResultInfo.offset)
}.toMap
successfulLogTimestampOffsets ++ getLogEndOffsets(groupId,
unsuccessfulOffsetsForTimes.keySet.toSeq)
@@ -609,9 +627,6 @@ object ConsumerGroupCommand extends Logging {
def close(): Unit = {
adminClient.close()
- consumers.values.foreach(consumer =>
- Option(consumer).foreach(_.close())
- )
}
private def createAdminClient(configOverrides: Map[String, String]): Admin
= {
@@ -621,32 +636,6 @@ object ConsumerGroupCommand extends Logging {
admin.AdminClient.create(props)
}
- private def getConsumer(groupId: String) = {
- if (consumers.get(groupId).isEmpty)
- consumers.update(groupId, createConsumer(groupId))
- consumers(groupId)
- }
-
- private def createConsumer(groupId: String): KafkaConsumer[String, String]
= {
- val properties = new Properties()
- val deserializer = (new StringDeserializer).getClass.getName
- val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt)
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
- properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
deserializer)
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
deserializer)
-
- if (opts.options.has(opts.commandConfigOpt)) {
-
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)).asScala.foreach {
- case (k,v) => properties.put(k, v)
- }
- }
-
- new KafkaConsumer(properties)
- }
-
private def withTimeoutMs [T <: AbstractOptions[T]] (options : T) = {
val t = opts.options.valueOf(opts.timeoutMsOpt).intValue()
options.timeoutMs(t)
@@ -657,8 +646,17 @@ object ConsumerGroupCommand extends Logging {
val topicPartitions = topicArg.split(":")
val topic = topicPartitions(0)
topicPartitions(1).split(",").map(partition => new
TopicPartition(topic, partition.toInt))
- case topic => getConsumer(groupId).partitionsFor(topic).asScala
- .map(partitionInfo => new TopicPartition(topic,
partitionInfo.partition))
+ case topic =>
+ val descriptionMap = adminClient.describeTopics(
+ Seq(topic).asJava,
+ withTimeoutMs(new DescribeTopicsOptions)
+ ).all().get.asScala
+ val r = descriptionMap.flatMap{ case(topic, description) =>
+ description.partitions().asScala.map{ tpInfo =>
+ new TopicPartition(topic, tpInfo.partition)
+ }
+ }
+ r
}
private def getPartitionsToReset(groupId: String): Seq[TopicPartition] = {
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 43bc042..2089e5c 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -30,7 +30,7 @@ import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.errors._
import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.Errors
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3995bec..4e7ceda 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -35,7 +35,7 @@ import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile,
OffsetCheckpoints}
import kafka.utils._
import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.{ElectionType, Node, TopicPartition}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, Node,
TopicPartition}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 38fe165..38073dc 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.clients.admin.{Admin, AdminClient,
AdminClientConfig, Al
import org.apache.kafka.clients.consumer._
import
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.ElectionType
+import org.apache.kafka.common.{ElectionType, IsolationLevel}
import org.apache.kafka.common.acl.{AccessControlEntry,
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation}
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index eb50e96..f57dd85 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -28,14 +28,14 @@ import kafka.common.UnexpectedAppendOffsetException
import kafka.log.{Defaults => _, _}
import kafka.server._
import kafka.utils._
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.errors.{ApiException,
OffsetNotAvailableException, ReplicaNotAvailableException}
import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel,
ListOffsetRequest}
+import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest}
import org.junit.Test
import org.junit.Assert._
import org.mockito.Mockito._
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 895d9e5..07acfde 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -25,10 +25,10 @@ import kafka.log.LogConfig
import kafka.message.{GZIPCompressionCodec, ProducerCompressionCodec,
ZStdCompressionCodec}
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, Record, RecordBatch}
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse,
IsolationLevel, FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse,
FetchMetadata => JFetchMetadata}
import org.apache.kafka.common.serialization.{ByteArraySerializer,
StringSerializer}
import org.junit.Assert._
import org.junit.Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 6f30c11..8523491 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -33,7 +33,7 @@ import kafka.network.RequestChannel.SendResponse
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{MockTime, TestUtils}
import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.memory.MemoryPool
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 9c97c1a..a298d84 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -19,9 +19,9 @@ package kafka.server
import java.util.Optional
import kafka.utils.TestUtils
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest,
ListOffsetResponse}
+import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse}
import org.junit.Assert._
import org.junit.Test
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index d88540d..85d2ded 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -24,10 +24,10 @@ import java.util.{Optional, Properties, Random}
import kafka.log.{Log, LogSegment}
import kafka.network.SocketServer
import kafka.utils.{MockTime, TestUtils}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse,
IsolationLevel, ListOffsetRequest, ListOffsetResponse}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse,
ListOffsetRequest, ListOffsetResponse}
import org.easymock.{EasyMock, IAnswer}
import org.junit.Assert._
import org.junit.Test
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 9aaae92..01d6bf0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -46,7 +46,7 @@ import
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition}
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{After, Before, Test}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index f34eae4..6b1d3c1 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -22,7 +22,7 @@ import kafka.log.LogConfig
import kafka.network.RequestChannel.Session
import kafka.security.authorizer.AclAuthorizer
import kafka.utils.TestUtils
-import org.apache.kafka.common.{ElectionType, Node, TopicPartition}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, Node,
TopicPartition}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.ConfigResource
import
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
CreatableTopicCollection}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 2aecdc2..cdec817 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -50,10 +50,10 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
-import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
/**
* Configuration for a {@link KafkaStreams} instance.
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 1d31377..27b574a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -44,8 +44,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.Properties;
-import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
-import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED;
+import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED;
+import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION;
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 15d6789..c611dc2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.requests.IsolationLevel;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 0c9889a..255afc8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.requests.IsolationLevel;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 5daf066..e98d152 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -27,11 +27,11 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;