lqjacklee created KAFKA-10208:
---------------------------------
Summary:
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.OffsetFetchResponseHandler
return null when Broker unexpectedly doesn't support requireStable flag on
version while not any information
Key: KAFKA-10208
URL: https://issues.apache.org/jira/browse/KAFKA-10208
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 2.7.0
Reporter: lqjacklee
When the 2.7.0 client try to request the broker whose version is 2.3.0, the
OffsetAndMetadata will be null and miss the Key information.
I have create the test case as below :
@Test
public void testCreateTopicAndCheckTheOffsite() throws ExecutionException,
InterruptedException {
String topicName = UUID.randomUUID().toString();
String groupId = "DEMO_" + topicName;
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, groupId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
String serializer = StringSerializer.class.getName();
String deserializer = StringDeserializer.class.getName();
props.put("auto.offset.reset", "latest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", deserializer);
props.put("value.deserializer", deserializer);
props.put("key.serializer", serializer);
props.put("value.serializer", serializer);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
AdminClient adminClient = AdminClient.create(props);
boolean topicExist = false;
try {
NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
CreateTopicsOptions createTopicsOptions = new CreateTopicsOptions();
createTopicsOptions.timeoutMs(3000000);
final CreateTopicsResult createTopicsResult =
adminClient.createTopics(Collections.singleton(newTopic), createTopicsOptions);
createTopicsResult.values().get(topicName).get();
}catch (TopicExistsException e) {
topicExist = true;
}
try {
List<TopicPartition> topicPartitions = new ArrayList<>();
KafkaConsumer<String,String> kafkaConsumer = new
KafkaConsumer<>(props);
Field kafkaClientField =
kafkaConsumer.getClass().getDeclaredField("client");
kafkaClientField.setAccessible(true);
ConsumerNetworkClient client = (ConsumerNetworkClient)
kafkaClientField.get(kafkaConsumer);
FindCoordinatorRequest.Builder findCoordinatorRequest =
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
.setKey(groupId));
Node node = client.leastLoadedNode();
Node coordinator;
RequestFuture<Node> requestCoordinatorFuture = client.send(node,
findCoordinatorRequest)
.compose(new RequestFutureAdapter<ClientResponse, Node>() {
@Override
public void onFailure(RuntimeException e,
RequestFuture<Node> future) {
super.onFailure(e, future);
}
@Override
public void onSuccess(ClientResponse value,
RequestFuture<Node> future) {
Node coordinator;
FindCoordinatorResponse findCoordinatorResponse =
(FindCoordinatorResponse) value.responseBody();
Errors error = findCoordinatorResponse.error();
if (error == Errors.NONE) {
// use MAX_VALUE - node.id as the coordinator
id to allow separate connections
// for the coordinator in the underlying
network client layer
int coordinatorConnectionId = Integer.MAX_VALUE
- findCoordinatorResponse.data().nodeId();
coordinator = new Node(
coordinatorConnectionId,
findCoordinatorResponse.data().host(),
findCoordinatorResponse.data().port());
client.tryConnect(coordinator);
future.complete(coordinator);
} else if (error ==
Errors.GROUP_AUTHORIZATION_FAILED) {
Assert.fail(error.message());
} else {
future.raise(error);
}
}
});
client.poll(requestCoordinatorFuture);
if (requestCoordinatorFuture.succeeded()) {
coordinator = requestCoordinatorFuture.value();
} else {
throw requestCoordinatorFuture.exception();
}
OffsetFetchRequest.Builder requestBuilder =
new OffsetFetchRequest.Builder(groupId, true,
topicPartitions, true);
RequestFuture<Map<TopicPartition, OffsetAndMetadata>>
topicPartitionMetadataRequestFuture = client.send(coordinator, requestBuilder)
.compose(new RequestFutureAdapter<ClientResponse,
Map<TopicPartition, OffsetAndMetadata>>() {
@Override
public void onSuccess(ClientResponse value,
RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
OffsetFetchResponse response =
(OffsetFetchResponse) value.responseBody();
if (response.hasError()) {
Errors error = response.error();
if (error ==
Errors.COORDINATOR_LOAD_IN_PROGRESS) {
// just retry
future.raise(error);
} else if (error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry
future.raise(error);
} else if (error ==
Errors.GROUP_AUTHORIZATION_FAILED) {
Assert.fail(Errors.GROUP_AUTHORIZATION_FAILED + "");
} else {
future.raise(new KafkaException("Unexpected
error in fetch offset response: " + error.message()));
}
return;
}
Set<String> unauthorizedTopics = null;
Map<TopicPartition, OffsetAndMetadata> offsets =
new HashMap<>(response.responseData().size());
Set<TopicPartition>
unstableTxnOffsetTopicPartitions = new HashSet<>();
for (Map.Entry<TopicPartition,
OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
OffsetFetchResponse.PartitionData partitionData
= entry.getValue();
if (partitionData.hasError()) {
Errors error = partitionData.error;
if (error ==
Errors.UNKNOWN_TOPIC_OR_PARTITION) {
future.raise(new KafkaException("Topic
or Partition " + tp + " does not exist"));
return;
} else if (error ==
Errors.TOPIC_AUTHORIZATION_FAILED) {
if (unauthorizedTopics == null) {
unauthorizedTopics = new
HashSet<>();
}
unauthorizedTopics.add(tp.topic());
} else if (error ==
Errors.UNSTABLE_OFFSET_COMMIT) {
unstableTxnOffsetTopicPartitions.add(tp);
} else {
future.raise(new
KafkaException("Unexpected error in fetch offset response for partition " +
tp + ": " + error.message()));
return;
}
} else if (partitionData.offset >= 0) {
// record the position with the offset (-1
indicates no committed offset to fetch);
// if there's no committed offset, record
as null
offsets.put(tp, new
OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch,
partitionData.metadata));
} else {
try {
HashMap<TopicPartition, OffsetSpec>
offsetMap = new HashMap<>();
offsetMap.put(tp,
OffsetSpec.earliest());
ListOffsetsResult listOffsetsResult =
adminClient.listOffsets(offsetMap);
Map<TopicPartition,
ListOffsetsResult.ListOffsetsResultInfo> topicPartitionListOffsetsResultInfoMap
= listOffsetsResult.all().get();
ListOffsetsResult.ListOffsetsResultInfo
offsetsResultInfo = topicPartitionListOffsetsResultInfoMap.get(tp);
offsets.put(tp, new
OffsetAndMetadata(offsetsResultInfo.offset(), offsetsResultInfo.leaderEpoch(),
""));
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
Assert.fail("not found the topic and
partition");
}
if (unauthorizedTopics != null) {
future.raise(new
TopicAuthorizationException(unauthorizedTopics));
} else if
(!unstableTxnOffsetTopicPartitions.isEmpty()) {
// just retry
future.raise(new
UnstableOffsetCommitException("There are unstable offsets for the requested
topic partitions"));
} else {
future.complete(offsets);
}
}
@Override
public void onFailure(RuntimeException e,
RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
super.onFailure(e, future);
}
});
client.poll(topicPartitionMetadataRequestFuture);
if(topicPartitionMetadataRequestFuture.succeeded()) {
Map<TopicPartition, OffsetAndMetadata> value =
topicPartitionMetadataRequestFuture.value();
Assert.assertNotNull(value);
}else {
Assert.fail(topicPartitionMetadataRequestFuture.exception().getMessage());
}
}catch (Exception e) {
Assert.fail(e.getMessage());
}finally {
if(topicExist) {
List<String> topicToDeleted = new ArrayList<>();
DeleteTopicsResult deleteTopicsResult =
adminClient.deleteTopics(topicToDeleted);
deleteTopicsResult.all().get();
}
}
}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)