This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 79ad902 MINOR: Missing throttle time in OffsetsForLeaderEpoch
response (#5635)
79ad902 is described below
commit 79ad9026a667469a2013ce82961c0c90f3bb0877
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Sep 11 09:08:22 2018 -0700
MINOR: Missing throttle time in OffsetsForLeaderEpoch response (#5635)
With KIP-320, the OffsetsForLeaderEpoch API is intended to be used by
consumers to detect log truncation. Therefore the new response schema should
expose a field for the throttle time like all the other APIs.
Reviewers: Dong Lin <[email protected]>
---
.../org/apache/kafka/common/protocol/ApiKeys.java | 2 +-
.../requests/OffsetsForLeaderEpochRequest.java | 4 ++--
.../requests/OffsetsForLeaderEpochResponse.java | 25 +++++++++++++++++-----
.../kafka/common/requests/RequestResponseTest.java | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 3 ++-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 1 +
6 files changed, 27 insertions(+), 10 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index e0cdfd9..3d77100 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -155,7 +155,7 @@ public enum ApiKeys {
DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(),
DeleteRecordsResponse.schemaVersions()),
INIT_PRODUCER_ID(22, "InitProducerId",
InitProducerIdRequest.schemaVersions(),
InitProducerIdResponse.schemaVersions()),
- OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", true,
OffsetsForLeaderEpochRequest.schemaVersions(),
+ OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", false,
OffsetsForLeaderEpochRequest.schemaVersions(),
OffsetsForLeaderEpochResponse.schemaVersions()),
ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false,
RecordBatch.MAGIC_VALUE_V2,
AddPartitionsToTxnRequest.schemaVersions(),
AddPartitionsToTxnResponse.schemaVersions()),
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 9de5d02..1c9009c 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -56,7 +56,7 @@ public class OffsetsForLeaderEpochRequest extends
AbstractRequest {
// V1 request is the same as v0. Per-partition leader epoch has been added
to response
private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V1 =
OFFSET_FOR_LEADER_EPOCH_REQUEST_V0;
- // V2 adds the current leader epoch to support fencing
+ // V2 adds the current leader epoch to support fencing and the addition of
the throttle time in the response
private static final Field PARTITIONS_V2 = PARTITIONS.withFields(
PARTITION_ID,
CURRENT_LEADER_EPOCH,
@@ -177,7 +177,7 @@ public class OffsetsForLeaderEpochRequest extends
AbstractRequest {
errorResponse.put(tp, new EpochEndOffset(
error, EpochEndOffset.UNDEFINED_EPOCH,
EpochEndOffset.UNDEFINED_EPOCH_OFFSET));
}
- return new OffsetsForLeaderEpochResponse(errorResponse);
+ return new OffsetsForLeaderEpochResponse(throttleTimeMs,
errorResponse);
}
public static class PartitionData {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index 324a2ed..55aa71b 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -34,6 +34,7 @@ import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.LEADER_EPOCH;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
+import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
public class OffsetsForLeaderEpochResponse extends AbstractResponse {
@@ -65,18 +66,23 @@ public class OffsetsForLeaderEpochResponse extends
AbstractResponse {
private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1 = new
Schema(
TOPICS_V1);
- // V2 bumped for addition of current leader epoch to the request schema.
- private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2 =
OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1;
+ // V2 bumped for addition of current leader epoch to the request schema
and the addition of the throttle
+ // time in the response
+ private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2 = new
Schema(
+ THROTTLE_TIME_MS,
+ TOPICS_V1);
public static Schema[] schemaVersions() {
return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0,
OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1,
OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2};
}
- private Map<TopicPartition, EpochEndOffset> epochEndOffsetsByPartition;
+ private final int throttleTimeMs;
+ private final Map<TopicPartition, EpochEndOffset>
epochEndOffsetsByPartition;
public OffsetsForLeaderEpochResponse(Struct struct) {
- epochEndOffsetsByPartition = new HashMap<>();
+ this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS,
DEFAULT_THROTTLE_TIME);
+ this.epochEndOffsetsByPartition = new HashMap<>();
for (Object topicAndEpocsObj : struct.get(TOPICS)) {
Struct topicAndEpochs = (Struct) topicAndEpocsObj;
String topic = topicAndEpochs.get(TOPIC_NAME);
@@ -93,6 +99,11 @@ public class OffsetsForLeaderEpochResponse extends
AbstractResponse {
}
public OffsetsForLeaderEpochResponse(Map<TopicPartition, EpochEndOffset>
epochsByTopic) {
+ this(DEFAULT_THROTTLE_TIME, epochsByTopic);
+ }
+
+ public OffsetsForLeaderEpochResponse(int throttleTimeMs,
Map<TopicPartition, EpochEndOffset> epochsByTopic) {
+ this.throttleTimeMs = throttleTimeMs;
this.epochEndOffsetsByPartition = epochsByTopic;
}
@@ -108,6 +119,10 @@ public class OffsetsForLeaderEpochResponse extends
AbstractResponse {
return errorCounts;
}
+ public int throttleTimeMs() {
+ return throttleTimeMs;
+ }
+
public static OffsetsForLeaderEpochResponse parse(ByteBuffer buffer, short
versionId) {
return new
OffsetsForLeaderEpochResponse(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(versionId).read(buffer));
}
@@ -115,9 +130,9 @@ public class OffsetsForLeaderEpochResponse extends
AbstractResponse {
@Override
protected Struct toStruct(short version) {
Struct responseStruct = new
Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(version));
+ responseStruct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
Map<String, Map<Integer, EpochEndOffset>> endOffsetsByTopic =
CollectionUtils.groupPartitionDataByTopic(epochEndOffsetsByPartition);
-
List<Struct> topics = new ArrayList<>(endOffsetsByTopic.size());
for (Map.Entry<String, Map<Integer, EpochEndOffset>>
topicToPartitionEpochs : endOffsetsByTopic.entrySet()) {
Struct topicStruct = responseStruct.instance(TOPICS);
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 05b9926..e34348a 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1053,7 +1053,7 @@ public class RequestResponseTest {
epochs.put(new TopicPartition("topic1", 1), new
EpochEndOffset(Errors.NONE, 1, 1));
epochs.put(new TopicPartition("topic2", 2), new
EpochEndOffset(Errors.NONE, 1, 2));
- return new OffsetsForLeaderEpochResponse(epochs);
+ return new OffsetsForLeaderEpochResponse(0, epochs);
}
private AddPartitionsToTxnRequest createAddPartitionsToTxnRequest() {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9f32b94..5a82cc4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2042,7 +2042,8 @@ class KafkaApis(val requestChannel: RequestChannel,
authorizeClusterAction(request)
val lastOffsetForLeaderEpoch =
replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava
- sendResponseExemptThrottle(request, new
OffsetsForLeaderEpochResponse(lastOffsetForLeaderEpoch))
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new OffsetsForLeaderEpochResponse(requestThrottleMs,
lastOffsetForLeaderEpoch))
}
def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 3b7ecfb..7df1bd6 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -450,6 +450,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.EXPIRE_DELEGATION_TOKEN => new
ExpireDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.RENEW_DELEGATION_TOKEN => new
RenewDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.DELETE_GROUPS => new
DeleteGroupsResponse(response).throttleTimeMs
+ case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new
OffsetsForLeaderEpochResponse(response).throttleTimeMs
case requestId => throw new IllegalArgumentException(s"No throttle time
for $requestId")
}
}