Repository: kafka Updated Branches: refs/heads/trunk c5df2a8e3 -> d8fe98efe
kafka-2044; Support requests and responses from o.a.k.common in KafkaApis; patched by Gwen Shapira; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d8fe98ef Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d8fe98ef Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d8fe98ef Branch: refs/heads/trunk Commit: d8fe98efee5a44ae12c1e3484fa20f89b0f30054 Parents: c5df2a8 Author: Gwen Shapira <csh...@gmail.com> Authored: Sat Mar 28 08:39:48 2015 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Sat Mar 28 08:39:48 2015 -0700 ---------------------------------------------------------------------- checkstyle/import-control.xml | 4 +- .../main/java/org/apache/kafka/common/Node.java | 4 ++ .../kafka/common/requests/AbstractRequest.java | 62 ++++++++++++++++++++ .../requests/ConsumerMetadataRequest.java | 9 ++- .../kafka/common/requests/FetchRequest.java | 17 +++++- .../kafka/common/requests/FetchResponse.java | 3 + .../kafka/common/requests/HeartbeatRequest.java | 8 ++- .../kafka/common/requests/JoinGroupRequest.java | 8 ++- .../common/requests/ListOffsetRequest.java | 15 ++++- .../kafka/common/requests/MetadataRequest.java | 14 ++++- .../kafka/common/requests/MetadataResponse.java | 19 ++++++ .../common/requests/OffsetCommitRequest.java | 13 +++- .../common/requests/OffsetFetchRequest.java | 17 +++++- .../common/requests/OffsetFetchResponse.java | 3 + .../kafka/common/requests/ProduceRequest.java | 19 +++++- .../kafka/common/requests/ProduceResponse.java | 2 + .../common/requests/RequestResponseTest.java | 34 +++++++---- .../kafka/api/HeartbeatRequestAndHeader.scala | 45 -------------- .../kafka/api/HeartbeatResponseAndHeader.scala | 28 --------- .../kafka/api/JoinGroupRequestAndHeader.scala | 45 -------------- .../kafka/api/JoinGroupResponseAndHeader.scala | 28 --------- core/src/main/scala/kafka/api/RequestKeys.scala | 4 +- .../kafka/network/BoundedByteBufferSend.scala | 8 +++ .../scala/kafka/network/RequestChannel.scala | 19 +++++- .../src/main/scala/kafka/server/KafkaApis.scala | 48 +++++++++------ .../api/RequestResponseSerializationTest.scala | 29 +-------- 26 files changed, 287 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index cca4b38..f2e6cec 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -66,6 +66,8 @@ <subpackage name="requests"> <allow pkg="org.apache.kafka.common.protocol" /> <allow pkg="org.apache.kafka.common.network" /> + <!-- for testing --> + <allow pkg="org.apache.kafka.common.errors" /> </subpackage> <subpackage name="serialization"> @@ -97,4 +99,4 @@ <allow pkg="org.apache.kafka" /> </subpackage> -</import-control> \ No newline at end of file +</import-control> http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/Node.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java index 88c3b24..f4e4186 100644 --- a/clients/src/main/java/org/apache/kafka/common/Node.java +++ b/clients/src/main/java/org/apache/kafka/common/Node.java @@ -28,6 +28,10 @@ public class Node { this.port = port; } + public static Node noNode() { + return new Node(-1, "", -1); + } + /** * The node id of this node */ http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java new file mode 100644 index 0000000..5e5308e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public abstract class AbstractRequest extends AbstractRequestResponse { + + public AbstractRequest(Struct struct) { + super(struct); + } + + /** + * Get an error response for a request + */ + public abstract AbstractRequestResponse getErrorResponse(Throwable e); + + /** + * Factory method for getting a request object based on ApiKey ID and a buffer + */ + public static AbstractRequest getRequest(int requestId, ByteBuffer buffer) { + switch (ApiKeys.forId(requestId)) { + case PRODUCE: + return ProduceRequest.parse(buffer); + case FETCH: + return FetchRequest.parse(buffer); + case LIST_OFFSETS: + return ListOffsetRequest.parse(buffer); + case METADATA: + return MetadataRequest.parse(buffer); + case OFFSET_COMMIT: + return OffsetCommitRequest.parse(buffer); + case OFFSET_FETCH: + return OffsetFetchRequest.parse(buffer); + case CONSUMER_METADATA: + return ConsumerMetadataRequest.parse(buffer); + case JOIN_GROUP: + return JoinGroupRequest.parse(buffer); + case HEARTBEAT: + return HeartbeatRequest.parse(buffer); + default: + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java index 1651e75..04b90bf 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java @@ -12,14 +12,16 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.Node; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -public class ConsumerMetadataRequest extends AbstractRequestResponse { +public class ConsumerMetadataRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id); private static final String GROUP_ID_KEY_NAME = "group_id"; @@ -38,6 +40,11 @@ public class ConsumerMetadataRequest extends AbstractRequestResponse { groupId = struct.getString(GROUP_ID_KEY_NAME); } + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + return new ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode()); + } + public String groupId() { return groupId; } http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 721e7d3..8686d83 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -20,12 +20,13 @@ import java.util.Map; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; -public class FetchRequest extends AbstractRequestResponse { +public class FetchRequest extends AbstractRequest { public static final int CONSUMER_REPLICA_ID = -1; private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id); @@ -118,6 +119,20 @@ public class FetchRequest extends AbstractRequestResponse { } } + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>(); + + for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) { + FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(), + FetchResponse.INVALID_HIGHWATERMARK, + FetchResponse.EMPTY_RECORD_SET); + responseData.put(entry.getKey(), partitionResponse); + } + + return new FetchResponse(responseData); + } + public int replicaId() { return replicaId; } http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index f020aaa..eb8951f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -55,6 +55,9 @@ public class FetchResponse extends AbstractRequestResponse { private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark"; private static final String RECORD_SET_KEY_NAME = "record_set"; + public static final long INVALID_HIGHWATERMARK = -1L; + public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0); + private final Map<TopicPartition, PartitionData> responseData; public static final class PartitionData { http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java index 6943878..51d081f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java @@ -13,13 +13,14 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -public class HeartbeatRequest extends AbstractRequestResponse { +public class HeartbeatRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id); private static final String GROUP_ID_KEY_NAME = "group_id"; @@ -62,4 +63,9 @@ public class HeartbeatRequest extends AbstractRequestResponse { public static HeartbeatRequest parse(ByteBuffer buffer) { return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer)); } + + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + return new HeartbeatResponse(Errors.forException(e).code()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 1ebc188..6795682 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -13,6 +13,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -21,7 +22,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -public class JoinGroupRequest extends AbstractRequestResponse { +public class JoinGroupRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id); private static final String GROUP_ID_KEY_NAME = "group_id"; @@ -87,4 +88,9 @@ public class JoinGroupRequest extends AbstractRequestResponse { public static JoinGroupRequest parse(ByteBuffer buffer) { return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer)); } + + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + return new JoinGroupResponse(Errors.forException(e).code()); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index e5dc92e..19267ee 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -29,7 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class ListOffsetRequest extends AbstractRequestResponse { +public class ListOffsetRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id); private static final String REPLICA_ID_KEY_NAME = "replica_id"; @@ -105,6 +106,18 @@ public class ListOffsetRequest extends AbstractRequestResponse { } } + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>(); + + for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) { + ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), new ArrayList<Long>()); + responseData.put(entry.getKey(), partitionResponse); + } + + return new ListOffsetResponse(responseData); + } + public int replicaId() { return replicaId; } http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 5d5f52c..7e0ce15 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -14,14 +14,17 @@ package org.apache.kafka.common.requests; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -public class MetadataRequest extends AbstractRequestResponse { +public class MetadataRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id); private static final String TOPICS_KEY_NAME = "topics"; @@ -43,6 +46,15 @@ public class MetadataRequest extends AbstractRequestResponse { } } + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + Map<String, Errors> topicErrors = new HashMap<String, Errors>(); + for (String topic: topics) { + topicErrors.put(topic, Errors.forException(e)); + } + return new MetadataResponse(topicErrors); + } + public List<String> topics() { return topics; } http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 36736ec..44e2ce6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -67,6 +67,25 @@ public class MetadataResponse extends AbstractRequestResponse { private final Cluster cluster; private final Map<String, Errors> errors; + /* Constructor for error responses where most of the data, except error per topic, is irrelevant */ + public MetadataResponse(Map<String, Errors> topicErrors) { + super(new Struct(CURRENT_SCHEMA)); + + struct.set(BROKERS_KEY_NAME, new ArrayList<Struct>().toArray()); + List<Struct> topicArray = new ArrayList<Struct>(); + for (Map.Entry<String, Errors> topicError : topicErrors.entrySet()) { + Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME); + topicData.set(TOPIC_ERROR_CODE_KEY_NAME, topicError.getValue().code()); + topicData.set(TOPIC_KEY_NAME, topicError.getKey()); + topicData.set(PARTITION_METADATA_KEY_NAME, new ArrayList<Struct>().toArray()); + topicArray.add(topicData); + } + struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray()); + + this.errors = topicErrors; + this.cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>()); + } + public MetadataResponse(Cluster cluster) { super(new Struct(CURRENT_SCHEMA)); http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index b92f670..a0e1976 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -20,6 +20,7 @@ import java.util.Map; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -28,7 +29,8 @@ import org.apache.kafka.common.utils.CollectionUtils; /** * This wrapper supports both v0 and v1 of OffsetCommitRequest. */ -public class OffsetCommitRequest extends AbstractRequestResponse { +public class OffsetCommitRequest extends AbstractRequest { + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id); private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String GENERATION_ID_KEY_NAME = "group_generation_id"; @@ -213,6 +215,15 @@ public class OffsetCommitRequest extends AbstractRequestResponse { } } + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>(); + for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) { + responseData.put(entry.getKey(), Errors.forException(e).code()); + } + return new OffsetCommitResponse(responseData); + } + public String groupId() { return groupId; } http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index 16c807c..deec1fa 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -14,6 +14,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -21,13 +22,14 @@ import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; /** * This wrapper supports both v0 and v1 of OffsetFetchRequest. */ -public class OffsetFetchRequest extends AbstractRequestResponse { +public class OffsetFetchRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_FETCH.id); private static final String GROUP_ID_KEY_NAME = "group_id"; @@ -85,6 +87,19 @@ public class OffsetFetchRequest extends AbstractRequestResponse { groupId = struct.getString(GROUP_ID_KEY_NAME); } + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>(); + + for (TopicPartition partition: partitions) { + responseData.put(partition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, + OffsetFetchResponse.NO_METADATA, + Errors.forException(e).code())); + } + + return new OffsetFetchResponse(responseData); + } + public String groupId() { return groupId; } http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index f10c246..512a0ef 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -41,6 +41,9 @@ public class OffsetFetchResponse extends AbstractRequestResponse { private static final String METADATA_KEY_NAME = "metadata"; private static final String ERROR_CODE_KEY_NAME = "error_code"; + public static final long INVALID_OFFSET = -1L; + public static final String NO_METADATA = ""; + /** * Possible error code: * http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 995f89f..fabeae3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -15,6 +15,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -26,7 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class ProduceRequest extends AbstractRequestResponse { +public class ProduceRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id); private static final String ACKS_KEY_NAME = "acks"; @@ -88,6 +89,22 @@ public class ProduceRequest extends AbstractRequestResponse { timeout = struct.getInt(TIMEOUT_KEY_NAME); } + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + + /* In case the producer doesn't actually want any response */ + if (acks == 0) + return null; + + Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>(); + + for (Map.Entry<TopicPartition, ByteBuffer> entry: partitionRecords.entrySet()) { + responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET)); + } + + return new ProduceResponse(responseMap); + } + public short acks() { return acks; } http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 4b67f70..37ec0b7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -38,6 +38,8 @@ public class ProduceResponse extends AbstractRequestResponse { private static final String PARTITION_KEY_NAME = "partition"; private static final String ERROR_CODE_KEY_NAME = "error_code"; + public static final long INVALID_OFFSET = -1L; + /** * Possible error code: * http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 61a767a..e3cc196 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -17,6 +17,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.protocol.Errors; import org.junit.Test; @@ -33,29 +34,38 @@ public class RequestResponseTest { @Test public void testSerialization() throws Exception { - List<AbstractRequestResponse> requestList = Arrays.asList( + List<AbstractRequestResponse> requestResponseList = Arrays.asList( createRequestHeader(), createResponseHeader(), createConsumerMetadataRequest(), + createConsumerMetadataRequest().getErrorResponse(new UnknownServerException()), createConsumerMetadataResponse(), createFetchRequest(), + createFetchRequest().getErrorResponse(new UnknownServerException()), createFetchResponse(), createHeartBeatRequest(), + createHeartBeatRequest().getErrorResponse(new UnknownServerException()), createHeartBeatResponse(), createJoinGroupRequest(), + createJoinGroupRequest().getErrorResponse(new UnknownServerException()), createJoinGroupResponse(), createListOffsetRequest(), + createListOffsetRequest().getErrorResponse(new UnknownServerException()), createListOffsetResponse(), createMetadataRequest(), + createMetadataRequest().getErrorResponse(new UnknownServerException()), createMetadataResponse(), createOffsetCommitRequest(), + createOffsetCommitRequest().getErrorResponse(new UnknownServerException()), createOffsetCommitResponse(), createOffsetFetchRequest(), + createOffsetFetchRequest().getErrorResponse(new UnknownServerException()), createOffsetFetchResponse(), createProduceRequest(), + createProduceRequest().getErrorResponse(new UnknownServerException()), createProduceResponse()); - for (AbstractRequestResponse req: requestList) { + for (AbstractRequestResponse req: requestResponseList) { ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf()); req.writeTo(buffer); buffer.rewind(); @@ -75,7 +85,7 @@ public class RequestResponseTest { return new ResponseHeader(10); } - private AbstractRequestResponse createConsumerMetadataRequest() { + private AbstractRequest createConsumerMetadataRequest() { return new ConsumerMetadataRequest("test-group"); } @@ -83,7 +93,7 @@ public class RequestResponseTest { return new ConsumerMetadataResponse((short) 1, new Node(10, "host1", 2014)); } - private AbstractRequestResponse createFetchRequest() { + private AbstractRequest createFetchRequest() { Map<TopicPartition, FetchRequest.PartitionData> fetchData = new HashMap<TopicPartition, FetchRequest.PartitionData>(); fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000)); fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000)); @@ -96,7 +106,7 @@ public class RequestResponseTest { return new FetchResponse(responseData); } - private AbstractRequestResponse createHeartBeatRequest() { + private AbstractRequest createHeartBeatRequest() { return new HeartbeatRequest("group1", 1, "consumer1"); } @@ -104,7 +114,7 @@ public class RequestResponseTest { return new HeartbeatResponse(Errors.NONE.code()); } - private AbstractRequestResponse createJoinGroupRequest() { + private AbstractRequest createJoinGroupRequest() { return new JoinGroupRequest("group1", 30000, Arrays.asList("topic1"), "consumer1", "strategy1"); } @@ -112,7 +122,7 @@ public class RequestResponseTest { return new JoinGroupResponse(Errors.NONE.code(), 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1))); } - private AbstractRequestResponse createListOffsetRequest() { + private AbstractRequest createListOffsetRequest() { Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(); offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10)); return new ListOffsetRequest(-1, offsetData); @@ -124,7 +134,7 @@ public class RequestResponseTest { return new ListOffsetResponse(responseData); } - private AbstractRequestResponse createMetadataRequest() { + private AbstractRequest createMetadataRequest() { return new MetadataRequest(Arrays.asList("topic1")); } @@ -138,7 +148,7 @@ public class RequestResponseTest { return new MetadataResponse(cluster); } - private AbstractRequestResponse createOffsetCommitRequest() { + private AbstractRequest createOffsetCommitRequest() { Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(); commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, "")); return new OffsetCommitRequest("group1", 100, "consumer1", 1000000, commitData); @@ -150,7 +160,7 @@ public class RequestResponseTest { return new OffsetCommitResponse(responseData); } - private AbstractRequestResponse createOffsetFetchRequest() { + private AbstractRequest createOffsetFetchRequest() { return new OffsetFetchRequest("group1", Arrays.asList(new TopicPartition("test11", 1))); } @@ -160,10 +170,10 @@ public class RequestResponseTest { return new OffsetFetchResponse(responseData); } - private AbstractRequestResponse createProduceRequest() { + private AbstractRequest createProduceRequest() { Map<TopicPartition, ByteBuffer> produceData = new HashMap<TopicPartition, ByteBuffer>(); produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10)); - return new ProduceRequest(Errors.NONE.code(), 5000, produceData); + return new ProduceRequest((short) 1, 5000, produceData); } private AbstractRequestResponse createProduceResponse() { http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala deleted file mode 100644 index f168d9f..0000000 --- a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package kafka.api - -import java.nio.ByteBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common.ErrorMapping -import org.apache.kafka.common.requests.{HeartbeatResponse, HeartbeatRequest} -import kafka.api.ApiUtils._ -import kafka.network.RequestChannel.Response -import scala.Some - -object HeartbeatRequestAndHeader { - def readFrom(buffer: ByteBuffer): HeartbeatRequestAndHeader = { - val versionId = buffer.getShort - val correlationId = buffer.getInt - val clientId = readShortString(buffer) - val body = HeartbeatRequest.parse(buffer) - new HeartbeatRequestAndHeader(versionId, correlationId, clientId, body) - } -} - -case class HeartbeatRequestAndHeader(override val versionId: Short, - override val correlationId: Int, - override val clientId: String, - override val body: HeartbeatRequest) - extends GenericRequestAndHeader(versionId, correlationId, clientId, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), Some(RequestKeys.HeartbeatKey)) { - - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val errorResponseBody = new HeartbeatResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - val errorHeartBeatResponseAndHeader = new HeartbeatResponseAndHeader(correlationId, errorResponseBody) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader))) - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala deleted file mode 100644 index 9a71faa..0000000 --- a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import org.apache.kafka.common.requests.HeartbeatResponse -import java.nio.ByteBuffer - -object HeartbeatResponseAndHeader { - def readFrom(buffer: ByteBuffer): HeartbeatResponseAndHeader = { - val correlationId = buffer.getInt - val body = HeartbeatResponse.parse(buffer) - new HeartbeatResponseAndHeader(correlationId, body) - } -} - -case class HeartbeatResponseAndHeader(override val correlationId: Int, override val body: HeartbeatResponse) - extends GenericResponseAndHeader(correlationId, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), None) { -} http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala deleted file mode 100644 index 3651e86..0000000 --- a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package kafka.api - -import java.nio.ByteBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common.ErrorMapping -import org.apache.kafka.common.requests._ -import kafka.api.ApiUtils._ -import kafka.network.RequestChannel.Response -import scala.Some - -object JoinGroupRequestAndHeader { - def readFrom(buffer: ByteBuffer): JoinGroupRequestAndHeader = { - val versionId = buffer.getShort - val correlationId = buffer.getInt - val clientId = readShortString(buffer) - val body = JoinGroupRequest.parse(buffer) - new JoinGroupRequestAndHeader(versionId, correlationId, clientId, body) - } -} - -case class JoinGroupRequestAndHeader(override val versionId: Short, - override val correlationId: Int, - override val clientId: String, - override val body: JoinGroupRequest) - extends GenericRequestAndHeader(versionId, correlationId, clientId, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), Some(RequestKeys.JoinGroupKey)) { - - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val errorResponseBody = new JoinGroupResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - val errorHeartBeatResponseAndHeader = new JoinGroupResponseAndHeader(correlationId, errorResponseBody) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader))) - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala deleted file mode 100644 index d0f07e0..0000000 --- a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import org.apache.kafka.common.requests.JoinGroupResponse -import java.nio.ByteBuffer - -object JoinGroupResponseAndHeader { - def readFrom(buffer: ByteBuffer): JoinGroupResponseAndHeader = { - val correlationId = buffer.getInt - val body = JoinGroupResponse.parse(buffer) - new JoinGroupResponseAndHeader(correlationId, body) - } -} - -case class JoinGroupResponseAndHeader(override val correlationId: Int, override val body: JoinGroupResponse) - extends GenericResponseAndHeader(correlationId, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) { -} http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/api/RequestKeys.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index c24c034..ef7a86e 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -46,9 +46,7 @@ object RequestKeys { ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom), OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom), OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom), - ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom), - JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom), - HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom) + ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom) ) def nameForKey(key: Short): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala index 55ecac2..b95b73b 100644 --- a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala +++ b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala @@ -21,6 +21,7 @@ import java.nio._ import java.nio.channels._ import kafka.utils._ import kafka.api.RequestOrResponse +import org.apache.kafka.common.requests.{AbstractRequestResponse, ResponseHeader} @nonthreadsafe private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send { @@ -50,6 +51,13 @@ private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send buffer.rewind() } + def this(header: ResponseHeader, body: AbstractRequestResponse) = { + this(header.sizeOf + body.sizeOf) + header.writeTo(buffer) + body.writeTo(buffer) + buffer.rewind + } + def writeTo(channel: GatheringByteChannel): Int = { expectIncomplete() http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 7b1db3d..bc73540 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,6 +26,7 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ +import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader} import org.apache.log4j.Logger @@ -47,7 +48,23 @@ object RequestChannel extends Logging { @volatile var responseCompleteTimeMs = -1L @volatile var responseDequeueTimeMs = -1L val requestId = buffer.getShort() - val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) + val requestObj = + if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId)) + RequestKeys.deserializerForKey(requestId)(buffer) + else + null + val header: RequestHeader = + if (requestObj == null) { + buffer.rewind + RequestHeader.parse(buffer) + } else + null + val body: AbstractRequest = + if (requestObj == null) + AbstractRequest.getRequest(header.apiKey, buffer) + else + null + buffer = null private val requestLogger = Logger.getLogger("kafka.request.logger") trace("Processor %d received request : %s".format(processor, requestObj)) http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c33e848..f372af7 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,8 +17,7 @@ package kafka.server -import org.apache.kafka.common.requests.JoinGroupResponse -import org.apache.kafka.common.requests.HeartbeatResponse +import org.apache.kafka.common.requests.{JoinGroupResponse, JoinGroupRequest, HeartbeatRequest, HeartbeatResponse, ResponseHeader} import org.apache.kafka.common.TopicPartition import kafka.api._ @@ -74,7 +73,19 @@ class KafkaApis(val requestChannel: RequestChannel, } } catch { case e: Throwable => - request.requestObj.handleError(e, requestChannel, request) + if ( request.requestObj != null) + request.requestObj.handleError(e, requestChannel, request) + else { + val response = request.body.getErrorResponse(e) + val respHeader = new ResponseHeader(request.header.correlationId) + + /* If request doesn't have a default error response, we just close the connection. + For example, when produce request has acks set to 0 */ + if (response == null) + requestChannel.closeConnection(request.processor, request) + else + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(respHeader, response))) + } error("error when handling request %s".format(request.requestObj), e) } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds @@ -484,40 +495,41 @@ class KafkaApis(val requestChannel: RequestChannel, def handleJoinGroupRequest(request: RequestChannel.Request) { import JavaConversions._ - val joinGroupRequest = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader] + val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest] + val respHeader = new ResponseHeader(request.header.correlationId) // the callback for sending a join-group response def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) { val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer - val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.body.consumerId, partitionList) - val response = new JoinGroupResponseAndHeader(joinGroupRequest.correlationId, responseBody) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.consumerId, partitionList) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, responseBody))) } // let the coordinator to handle join-group coordinator.consumerJoinGroup( - joinGroupRequest.body.groupId(), - joinGroupRequest.body.consumerId(), - joinGroupRequest.body.topics().toList, - joinGroupRequest.body.sessionTimeout(), - joinGroupRequest.body.strategy(), + joinGroupRequest.groupId(), + joinGroupRequest.consumerId(), + joinGroupRequest.topics().toList, + joinGroupRequest.sessionTimeout(), + joinGroupRequest.strategy(), sendResponseCallback) } def handleHeartbeatRequest(request: RequestChannel.Request) { - val heartbeatRequest = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader] + val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest] + val respHeader = new ResponseHeader(request.header.correlationId) // the callback for sending a heartbeat response def sendResponseCallback(errorCode: Short) { - val response = new HeartbeatResponseAndHeader(heartbeatRequest.correlationId, new HeartbeatResponse(errorCode)) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + val response = new HeartbeatResponse(errorCode) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, response))) } // let the coordinator to handle heartbeat coordinator.consumerHeartbeat( - heartbeatRequest.body.groupId(), - heartbeatRequest.body.consumerId(), - heartbeatRequest.body.groupGenerationId(), + heartbeatRequest.groupId(), + heartbeatRequest.consumerId(), + heartbeatRequest.groupGenerationId(), sendResponseCallback) } http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 4cb803f..030faac 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -208,28 +208,6 @@ object SerializationTestUtils { def createConsumerMetadataResponse: ConsumerMetadataResponse = { ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError, 0) } - - def createHeartbeatRequestAndHeader: HeartbeatRequestAndHeader = { - val body = new HeartbeatRequest("group1", 1, "consumer1") - HeartbeatRequestAndHeader(0.asInstanceOf[Short], 1, "", body) - } - - def createHeartbeatResponseAndHeader: HeartbeatResponseAndHeader = { - val body = new HeartbeatResponse(0.asInstanceOf[Short]) - HeartbeatResponseAndHeader(1, body) - } - - def createJoinGroupRequestAndHeader: JoinGroupRequestAndHeader = { - import scala.collection.JavaConversions._ - val body = new JoinGroupRequest("group1", 30000, List("topic1"), "consumer1", "strategy1"); - JoinGroupRequestAndHeader(0.asInstanceOf[Short], 1, "", body) - } - - def createJoinGroupResponseAndHeader: JoinGroupResponseAndHeader = { - import scala.collection.JavaConversions._ - val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", List(new TopicPartition("test11", 1))) - JoinGroupResponseAndHeader(1, body) - } } class RequestResponseSerializationTest extends JUnitSuite { @@ -253,10 +231,6 @@ class RequestResponseSerializationTest extends JUnitSuite { private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0) - private val heartbeatRequest = SerializationTestUtils.createHeartbeatRequestAndHeader - private val heartbeatResponse = SerializationTestUtils.createHeartbeatResponseAndHeader - private val joinGroupRequest = SerializationTestUtils.createJoinGroupRequestAndHeader - private val joinGroupResponse = SerializationTestUtils.createJoinGroupResponseAndHeader @Test def testSerializationAndDeserialization() { @@ -269,8 +243,7 @@ class RequestResponseSerializationTest extends JUnitSuite { offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2, offsetCommitResponse, offsetFetchRequest, offsetFetchResponse, consumerMetadataRequest, consumerMetadataResponse, - consumerMetadataResponseNoCoordinator, heartbeatRequest, - heartbeatResponse, joinGroupRequest, joinGroupResponse) + consumerMetadataResponseNoCoordinator) requestsAndResponses.foreach { original => val buffer = ByteBuffer.allocate(original.sizeInBytes)