Repository: kafka
Updated Branches:
  refs/heads/trunk c5df2a8e3 -> d8fe98efe


kafka-2044; Support requests and responses from o.a.k.common in KafkaApis; 
patched by Gwen Shapira; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d8fe98ef
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d8fe98ef
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d8fe98ef

Branch: refs/heads/trunk
Commit: d8fe98efee5a44ae12c1e3484fa20f89b0f30054
Parents: c5df2a8
Author: Gwen Shapira <csh...@gmail.com>
Authored: Sat Mar 28 08:39:48 2015 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Sat Mar 28 08:39:48 2015 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  4 +-
 .../main/java/org/apache/kafka/common/Node.java |  4 ++
 .../kafka/common/requests/AbstractRequest.java  | 62 ++++++++++++++++++++
 .../requests/ConsumerMetadataRequest.java       |  9 ++-
 .../kafka/common/requests/FetchRequest.java     | 17 +++++-
 .../kafka/common/requests/FetchResponse.java    |  3 +
 .../kafka/common/requests/HeartbeatRequest.java |  8 ++-
 .../kafka/common/requests/JoinGroupRequest.java |  8 ++-
 .../common/requests/ListOffsetRequest.java      | 15 ++++-
 .../kafka/common/requests/MetadataRequest.java  | 14 ++++-
 .../kafka/common/requests/MetadataResponse.java | 19 ++++++
 .../common/requests/OffsetCommitRequest.java    | 13 +++-
 .../common/requests/OffsetFetchRequest.java     | 17 +++++-
 .../common/requests/OffsetFetchResponse.java    |  3 +
 .../kafka/common/requests/ProduceRequest.java   | 19 +++++-
 .../kafka/common/requests/ProduceResponse.java  |  2 +
 .../common/requests/RequestResponseTest.java    | 34 +++++++----
 .../kafka/api/HeartbeatRequestAndHeader.scala   | 45 --------------
 .../kafka/api/HeartbeatResponseAndHeader.scala  | 28 ---------
 .../kafka/api/JoinGroupRequestAndHeader.scala   | 45 --------------
 .../kafka/api/JoinGroupResponseAndHeader.scala  | 28 ---------
 core/src/main/scala/kafka/api/RequestKeys.scala |  4 +-
 .../kafka/network/BoundedByteBufferSend.scala   |  8 +++
 .../scala/kafka/network/RequestChannel.scala    | 19 +++++-
 .../src/main/scala/kafka/server/KafkaApis.scala | 48 +++++++++------
 .../api/RequestResponseSerializationTest.scala  | 29 +--------
 26 files changed, 287 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index cca4b38..f2e6cec 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -66,6 +66,8 @@
                <subpackage name="requests">
                        <allow pkg="org.apache.kafka.common.protocol" />
                        <allow pkg="org.apache.kafka.common.network" />
+                       <!-- for testing -->
+                       <allow pkg="org.apache.kafka.common.errors" />
                </subpackage>
        
                <subpackage name="serialization">
@@ -97,4 +99,4 @@
                <allow pkg="org.apache.kafka" />
        </subpackage>
        
-</import-control>
\ No newline at end of file
+</import-control>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/Node.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java 
b/clients/src/main/java/org/apache/kafka/common/Node.java
index 88c3b24..f4e4186 100644
--- a/clients/src/main/java/org/apache/kafka/common/Node.java
+++ b/clients/src/main/java/org/apache/kafka/common/Node.java
@@ -28,6 +28,10 @@ public class Node {
         this.port = port;
     }
 
+    public static Node noNode() {
+        return new Node(-1, "", -1);
+    }
+
     /**
      * The node id of this node
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
new file mode 100644
index 0000000..5e5308e
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public abstract class AbstractRequest extends AbstractRequestResponse {
+
+    public AbstractRequest(Struct struct) {
+        super(struct);
+    }
+
+    /**
+     * Get an error response for a request
+     */
+    public abstract AbstractRequestResponse getErrorResponse(Throwable e);
+
+    /**
+     * Factory method for getting a request object based on ApiKey ID and a 
buffer
+     */
+    public static AbstractRequest getRequest(int requestId, ByteBuffer buffer) 
{
+        switch (ApiKeys.forId(requestId)) {
+            case PRODUCE:
+                return ProduceRequest.parse(buffer);
+            case FETCH:
+                return FetchRequest.parse(buffer);
+            case LIST_OFFSETS:
+                return ListOffsetRequest.parse(buffer);
+            case METADATA:
+                return MetadataRequest.parse(buffer);
+            case OFFSET_COMMIT:
+                return OffsetCommitRequest.parse(buffer);
+            case OFFSET_FETCH:
+                return OffsetFetchRequest.parse(buffer);
+            case CONSUMER_METADATA:
+                return ConsumerMetadataRequest.parse(buffer);
+            case JOIN_GROUP:
+                return JoinGroupRequest.parse(buffer);
+            case HEARTBEAT:
+                return HeartbeatRequest.parse(buffer);
+            default:
+                return null;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
index 1651e75..04b90bf 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
@@ -12,14 +12,16 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-public class ConsumerMetadataRequest extends AbstractRequestResponse {
+public class ConsumerMetadataRequest extends AbstractRequest {
     
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
@@ -38,6 +40,11 @@ public class ConsumerMetadataRequest extends 
AbstractRequestResponse {
         groupId = struct.getString(GROUP_ID_KEY_NAME);
     }
 
+    @Override
+    public AbstractRequestResponse getErrorResponse(Throwable e) {
+        return new 
ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), 
Node.noNode());
+    }
+
     public String groupId() {
         return groupId;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 721e7d3..8686d83 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -20,12 +20,13 @@ import java.util.Map;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
-public class FetchRequest extends AbstractRequestResponse {
+public class FetchRequest extends AbstractRequest {
     
     public static final int CONSUMER_REPLICA_ID = -1;
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id);
@@ -118,6 +119,20 @@ public class FetchRequest extends AbstractRequestResponse {
         }
     }
 
+    @Override
+    public AbstractRequestResponse getErrorResponse(Throwable e) {
+        Map<TopicPartition, FetchResponse.PartitionData> responseData = new 
HashMap<TopicPartition, FetchResponse.PartitionData>();
+
+        for (Map.Entry<TopicPartition, PartitionData> entry: 
fetchData.entrySet()) {
+            FetchResponse.PartitionData partitionResponse = new 
FetchResponse.PartitionData(Errors.forException(e).code(),
+                                                                               
             FetchResponse.INVALID_HIGHWATERMARK,
+                                                                               
             FetchResponse.EMPTY_RECORD_SET);
+            responseData.put(entry.getKey(), partitionResponse);
+        }
+
+        return new FetchResponse(responseData);
+    }
+
     public int replicaId() {
         return replicaId;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index f020aaa..eb8951f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -55,6 +55,9 @@ public class FetchResponse extends AbstractRequestResponse {
     private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
     private static final String RECORD_SET_KEY_NAME = "record_set";
 
+    public static final long INVALID_HIGHWATERMARK = -1L;
+    public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0);
+
     private final Map<TopicPartition, PartitionData> responseData;
 
     public static final class PartitionData {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 6943878..51d081f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -13,13 +13,14 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-public class HeartbeatRequest extends AbstractRequestResponse {
+public class HeartbeatRequest extends AbstractRequest {
     
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
@@ -62,4 +63,9 @@ public class HeartbeatRequest extends AbstractRequestResponse 
{
     public static HeartbeatRequest parse(ByteBuffer buffer) {
         return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer));
     }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(Throwable e) {
+        return new HeartbeatResponse(Errors.forException(e).code());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 1ebc188..6795682 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -13,6 +13,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -21,7 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class JoinGroupRequest extends AbstractRequestResponse {
+public class JoinGroupRequest extends AbstractRequest {
     
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
@@ -87,4 +88,9 @@ public class JoinGroupRequest extends AbstractRequestResponse 
{
     public static JoinGroupRequest parse(ByteBuffer buffer) {
         return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
     }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(Throwable e) {
+        return new JoinGroupResponse(Errors.forException(e).code());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index e5dc92e..19267ee 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -29,7 +30,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ListOffsetRequest extends AbstractRequestResponse {
+public class ListOffsetRequest extends AbstractRequest {
     
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
     private static final String REPLICA_ID_KEY_NAME = "replica_id";
@@ -105,6 +106,18 @@ public class ListOffsetRequest extends 
AbstractRequestResponse {
         }
     }
 
+    @Override
+    public AbstractRequestResponse getErrorResponse(Throwable e) {
+        Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = 
new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
+
+        for (Map.Entry<TopicPartition, PartitionData> entry: 
offsetData.entrySet()) {
+            ListOffsetResponse.PartitionData partitionResponse = new 
ListOffsetResponse.PartitionData(Errors.forException(e).code(), new 
ArrayList<Long>());
+            responseData.put(entry.getKey(), partitionResponse);
+        }
+
+        return new ListOffsetResponse(responseData);
+    }
+
     public int replicaId() {
         return replicaId;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 5d5f52c..7e0ce15 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -14,14 +14,17 @@ package org.apache.kafka.common.requests;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
-public class MetadataRequest extends AbstractRequestResponse {
+public class MetadataRequest extends AbstractRequest {
     
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
     private static final String TOPICS_KEY_NAME = "topics";
@@ -43,6 +46,15 @@ public class MetadataRequest extends AbstractRequestResponse 
{
         }
     }
 
+    @Override
+    public AbstractRequestResponse getErrorResponse(Throwable e) {
+        Map<String, Errors> topicErrors = new HashMap<String, Errors>();
+        for (String topic: topics) {
+            topicErrors.put(topic, Errors.forException(e));
+        }
+        return new MetadataResponse(topicErrors);
+    }
+
     public List<String> topics() {
         return topics;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 36736ec..44e2ce6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -67,6 +67,25 @@ public class MetadataResponse extends 
AbstractRequestResponse {
     private final Cluster cluster;
     private final Map<String, Errors> errors;
 
+    /* Constructor for error responses where most of the data, except error 
per topic, is irrelevant */
+    public MetadataResponse(Map<String, Errors> topicErrors) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        struct.set(BROKERS_KEY_NAME, new ArrayList<Struct>().toArray());
+        List<Struct> topicArray = new ArrayList<Struct>();
+        for (Map.Entry<String, Errors> topicError : topicErrors.entrySet()) {
+            Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
+            topicData.set(TOPIC_ERROR_CODE_KEY_NAME, 
topicError.getValue().code());
+            topicData.set(TOPIC_KEY_NAME, topicError.getKey());
+            topicData.set(PARTITION_METADATA_KEY_NAME, new 
ArrayList<Struct>().toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
+
+        this.errors = topicErrors;
+        this.cluster = new Cluster(new ArrayList<Node>(), new 
ArrayList<PartitionInfo>());
+    }
+
     public MetadataResponse(Cluster cluster) {
         super(new Struct(CURRENT_SCHEMA));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index b92f670..a0e1976 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -20,6 +20,7 @@ import java.util.Map;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -28,7 +29,8 @@ import org.apache.kafka.common.utils.CollectionUtils;
 /**
  * This wrapper supports both v0 and v1 of OffsetCommitRequest.
  */
-public class OffsetCommitRequest extends AbstractRequestResponse {
+public class OffsetCommitRequest extends AbstractRequest {
+    
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
@@ -213,6 +215,15 @@ public class OffsetCommitRequest extends 
AbstractRequestResponse {
         }
     }
 
+    @Override
+    public AbstractRequestResponse getErrorResponse(Throwable e) {
+        Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, 
Short>();
+        for (Map.Entry<TopicPartition, PartitionData> entry: 
offsetData.entrySet()) {
+            responseData.put(entry.getKey(), Errors.forException(e).code());
+        }
+        return new OffsetCommitResponse(responseData);
+    }
+
     public String groupId() {
         return groupId;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 16c807c..deec1fa 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -14,6 +14,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -21,13 +22,14 @@ import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
  * This wrapper supports both v0 and v1 of OffsetFetchRequest.
  */
-public class OffsetFetchRequest extends AbstractRequestResponse {
+public class OffsetFetchRequest extends AbstractRequest {
     
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_FETCH.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
@@ -85,6 +87,19 @@ public class OffsetFetchRequest extends 
AbstractRequestResponse {
         groupId = struct.getString(GROUP_ID_KEY_NAME);
     }
 
+    @Override
+    public AbstractRequestResponse getErrorResponse(Throwable e) {
+        Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = 
new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
+
+        for (TopicPartition partition: partitions) {
+            responseData.put(partition, new 
OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+                                                                              
OffsetFetchResponse.NO_METADATA,
+                                                                              
Errors.forException(e).code()));
+        }
+
+        return new OffsetFetchResponse(responseData);
+    }
+
     public String groupId() {
         return groupId;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index f10c246..512a0ef 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -41,6 +41,9 @@ public class OffsetFetchResponse extends 
AbstractRequestResponse {
     private static final String METADATA_KEY_NAME = "metadata";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
+    public static final long INVALID_OFFSET = -1L;
+    public static final String NO_METADATA = "";
+
     /**
      * Possible error code:
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 995f89f..fabeae3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -15,6 +15,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -26,7 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ProduceRequest  extends AbstractRequestResponse {
+public class ProduceRequest  extends AbstractRequest {
     
     private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id);
     private static final String ACKS_KEY_NAME = "acks";
@@ -88,6 +89,22 @@ public class ProduceRequest  extends AbstractRequestResponse 
{
         timeout = struct.getInt(TIMEOUT_KEY_NAME);
     }
 
+    @Override
+    public AbstractRequestResponse getErrorResponse(Throwable e) {
+
+        /* In case the producer doesn't actually want any response */
+        if (acks == 0)
+            return null;
+
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = 
new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
+
+        for (Map.Entry<TopicPartition, ByteBuffer> entry: 
partitionRecords.entrySet()) {
+            responseMap.put(entry.getKey(), new 
ProduceResponse.PartitionResponse(Errors.forException(e).code(), 
ProduceResponse.INVALID_OFFSET));
+        }
+
+        return new ProduceResponse(responseMap);
+    }
+
     public short acks() {
         return acks;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 4b67f70..37ec0b7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -38,6 +38,8 @@ public class ProduceResponse extends AbstractRequestResponse {
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
+    public static final long INVALID_OFFSET = -1L;
+
     /**
      * Possible error code:
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 61a767a..e3cc196 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -17,6 +17,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.protocol.Errors;
 import org.junit.Test;
 
@@ -33,29 +34,38 @@ public class RequestResponseTest {
 
     @Test
     public void testSerialization() throws Exception {
-        List<AbstractRequestResponse> requestList = Arrays.asList(
+        List<AbstractRequestResponse> requestResponseList = Arrays.asList(
                 createRequestHeader(),
                 createResponseHeader(),
                 createConsumerMetadataRequest(),
+                createConsumerMetadataRequest().getErrorResponse(new 
UnknownServerException()),
                 createConsumerMetadataResponse(),
                 createFetchRequest(),
+                createFetchRequest().getErrorResponse(new 
UnknownServerException()),
                 createFetchResponse(),
                 createHeartBeatRequest(),
+                createHeartBeatRequest().getErrorResponse(new 
UnknownServerException()),
                 createHeartBeatResponse(),
                 createJoinGroupRequest(),
+                createJoinGroupRequest().getErrorResponse(new 
UnknownServerException()),
                 createJoinGroupResponse(),
                 createListOffsetRequest(),
+                createListOffsetRequest().getErrorResponse(new 
UnknownServerException()),
                 createListOffsetResponse(),
                 createMetadataRequest(),
+                createMetadataRequest().getErrorResponse(new 
UnknownServerException()),
                 createMetadataResponse(),
                 createOffsetCommitRequest(),
+                createOffsetCommitRequest().getErrorResponse(new 
UnknownServerException()),
                 createOffsetCommitResponse(),
                 createOffsetFetchRequest(),
+                createOffsetFetchRequest().getErrorResponse(new 
UnknownServerException()),
                 createOffsetFetchResponse(),
                 createProduceRequest(),
+                createProduceRequest().getErrorResponse(new 
UnknownServerException()),
                 createProduceResponse());
 
-        for (AbstractRequestResponse req: requestList) {
+        for (AbstractRequestResponse req: requestResponseList) {
             ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf());
             req.writeTo(buffer);
             buffer.rewind();
@@ -75,7 +85,7 @@ public class RequestResponseTest {
         return new ResponseHeader(10);
     }
 
-    private AbstractRequestResponse createConsumerMetadataRequest() {
+    private AbstractRequest createConsumerMetadataRequest() {
         return new ConsumerMetadataRequest("test-group");
     }
 
@@ -83,7 +93,7 @@ public class RequestResponseTest {
         return new ConsumerMetadataResponse((short) 1, new Node(10, "host1", 
2014));
     }
 
-    private AbstractRequestResponse createFetchRequest() {
+    private AbstractRequest createFetchRequest() {
         Map<TopicPartition, FetchRequest.PartitionData> fetchData = new 
HashMap<TopicPartition, FetchRequest.PartitionData>();
         fetchData.put(new TopicPartition("test1", 0), new 
FetchRequest.PartitionData(100, 1000000));
         fetchData.put(new TopicPartition("test2", 0), new 
FetchRequest.PartitionData(200, 1000000));
@@ -96,7 +106,7 @@ public class RequestResponseTest {
         return new FetchResponse(responseData);
     }
 
-    private AbstractRequestResponse createHeartBeatRequest() {
+    private AbstractRequest createHeartBeatRequest() {
         return new HeartbeatRequest("group1", 1, "consumer1");
     }
 
@@ -104,7 +114,7 @@ public class RequestResponseTest {
         return new HeartbeatResponse(Errors.NONE.code());
     }
 
-    private AbstractRequestResponse createJoinGroupRequest() {
+    private AbstractRequest createJoinGroupRequest() {
         return new JoinGroupRequest("group1", 30000, Arrays.asList("topic1"), 
"consumer1", "strategy1");
     }
 
@@ -112,7 +122,7 @@ public class RequestResponseTest {
         return new JoinGroupResponse(Errors.NONE.code(), 1, "consumer1", 
Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1)));
     }
 
-    private AbstractRequestResponse createListOffsetRequest() {
+    private AbstractRequest createListOffsetRequest() {
         Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new 
HashMap<TopicPartition, ListOffsetRequest.PartitionData>();
         offsetData.put(new TopicPartition("test", 0), new 
ListOffsetRequest.PartitionData(1000000L, 10));
         return new ListOffsetRequest(-1, offsetData);
@@ -124,7 +134,7 @@ public class RequestResponseTest {
         return new ListOffsetResponse(responseData);
     }
 
-    private AbstractRequestResponse createMetadataRequest() {
+    private AbstractRequest createMetadataRequest() {
         return new MetadataRequest(Arrays.asList("topic1"));
     }
 
@@ -138,7 +148,7 @@ public class RequestResponseTest {
         return new MetadataResponse(cluster);
     }
 
-    private AbstractRequestResponse createOffsetCommitRequest() {
+    private AbstractRequest createOffsetCommitRequest() {
         Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = 
new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>();
         commitData.put(new TopicPartition("test", 0), new 
OffsetCommitRequest.PartitionData(100, ""));
         return new OffsetCommitRequest("group1", 100, "consumer1", 1000000, 
commitData);
@@ -150,7 +160,7 @@ public class RequestResponseTest {
         return new OffsetCommitResponse(responseData);
     }
 
-    private AbstractRequestResponse createOffsetFetchRequest() {
+    private AbstractRequest createOffsetFetchRequest() {
         return new OffsetFetchRequest("group1", Arrays.asList(new 
TopicPartition("test11", 1)));
     }
 
@@ -160,10 +170,10 @@ public class RequestResponseTest {
         return new OffsetFetchResponse(responseData);
     }
 
-    private AbstractRequestResponse createProduceRequest() {
+    private AbstractRequest createProduceRequest() {
         Map<TopicPartition, ByteBuffer> produceData = new 
HashMap<TopicPartition, ByteBuffer>();
         produceData.put(new TopicPartition("test", 0), 
ByteBuffer.allocate(10));
-        return new ProduceRequest(Errors.NONE.code(), 5000, produceData);
+        return new ProduceRequest((short) 1, 5000, produceData);
     }
 
     private AbstractRequestResponse createProduceResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
deleted file mode 100644
index f168d9f..0000000
--- a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
-import kafka.common.ErrorMapping
-import org.apache.kafka.common.requests.{HeartbeatResponse, HeartbeatRequest}
-import kafka.api.ApiUtils._
-import kafka.network.RequestChannel.Response
-import scala.Some
-
-object HeartbeatRequestAndHeader {
-  def readFrom(buffer: ByteBuffer): HeartbeatRequestAndHeader = {
-    val versionId = buffer.getShort
-    val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
-    val body = HeartbeatRequest.parse(buffer)
-    new HeartbeatRequestAndHeader(versionId, correlationId, clientId, body)
-  }
-}
-
-case class HeartbeatRequestAndHeader(override val versionId: Short,
-                                     override val correlationId: Int,
-                                     override val clientId: String,
-                                     override val body: HeartbeatRequest)
-  extends GenericRequestAndHeader(versionId, correlationId, clientId, body, 
RequestKeys.nameForKey(RequestKeys.HeartbeatKey), 
Some(RequestKeys.HeartbeatKey)) {
-
-  override def handleError(e: Throwable, requestChannel: RequestChannel, 
request: RequestChannel.Request): Unit = {
-    val errorResponseBody = new 
HeartbeatResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-    val errorHeartBeatResponseAndHeader = new 
HeartbeatResponseAndHeader(correlationId, errorResponseBody)
-    requestChannel.sendResponse(new Response(request, new 
BoundedByteBufferSend(errorHeartBeatResponseAndHeader)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
deleted file mode 100644
index 9a71faa..0000000
--- a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package kafka.api
-
-import org.apache.kafka.common.requests.HeartbeatResponse
-import java.nio.ByteBuffer
-
-object HeartbeatResponseAndHeader {
-  def readFrom(buffer: ByteBuffer): HeartbeatResponseAndHeader = {
-    val correlationId = buffer.getInt
-    val body = HeartbeatResponse.parse(buffer)
-    new HeartbeatResponseAndHeader(correlationId, body)
-  }
-}
-
-case class HeartbeatResponseAndHeader(override val correlationId: Int, 
override val body: HeartbeatResponse)
-  extends GenericResponseAndHeader(correlationId, body, 
RequestKeys.nameForKey(RequestKeys.HeartbeatKey), None) {
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
deleted file mode 100644
index 3651e86..0000000
--- a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
-import kafka.common.ErrorMapping
-import org.apache.kafka.common.requests._
-import kafka.api.ApiUtils._
-import kafka.network.RequestChannel.Response
-import scala.Some
-
-object JoinGroupRequestAndHeader {
-  def readFrom(buffer: ByteBuffer): JoinGroupRequestAndHeader = {
-    val versionId = buffer.getShort
-    val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
-    val body = JoinGroupRequest.parse(buffer)
-    new JoinGroupRequestAndHeader(versionId, correlationId, clientId, body)
-  }
-}
-
-case class JoinGroupRequestAndHeader(override val versionId: Short,
-                                     override val correlationId: Int,
-                                     override val clientId: String,
-                                     override val body: JoinGroupRequest)
-  extends GenericRequestAndHeader(versionId, correlationId, clientId, body, 
RequestKeys.nameForKey(RequestKeys.JoinGroupKey), 
Some(RequestKeys.JoinGroupKey)) {
-
-  override def handleError(e: Throwable, requestChannel: RequestChannel, 
request: RequestChannel.Request): Unit = {
-    val errorResponseBody = new 
JoinGroupResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-    val errorHeartBeatResponseAndHeader = new 
JoinGroupResponseAndHeader(correlationId, errorResponseBody)
-    requestChannel.sendResponse(new Response(request, new 
BoundedByteBufferSend(errorHeartBeatResponseAndHeader)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
deleted file mode 100644
index d0f07e0..0000000
--- a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package kafka.api
-
-import org.apache.kafka.common.requests.JoinGroupResponse
-import java.nio.ByteBuffer
-
-object JoinGroupResponseAndHeader {
-  def readFrom(buffer: ByteBuffer): JoinGroupResponseAndHeader = {
-    val correlationId = buffer.getInt
-    val body = JoinGroupResponse.parse(buffer)
-    new JoinGroupResponseAndHeader(correlationId, body)
-  }
-}
-
-case class JoinGroupResponseAndHeader(override val correlationId: Int, 
override val body: JoinGroupResponse)
-  extends GenericResponseAndHeader(correlationId, body, 
RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) {
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala 
b/core/src/main/scala/kafka/api/RequestKeys.scala
index c24c034..ef7a86e 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -46,9 +46,7 @@ object RequestKeys {
         ControlledShutdownKey -> ("ControlledShutdown", 
ControlledShutdownRequest.readFrom),
         OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
         OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
-        ConsumerMetadataKey -> ("ConsumerMetadata", 
ConsumerMetadataRequest.readFrom),
-        JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom),
-        HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom)
+        ConsumerMetadataKey -> ("ConsumerMetadata", 
ConsumerMetadataRequest.readFrom)
     )
 
   def nameForKey(key: Short): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
index 55ecac2..b95b73b 100644
--- a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
+++ b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
@@ -21,6 +21,7 @@ import java.nio._
 import java.nio.channels._
 import kafka.utils._
 import kafka.api.RequestOrResponse
+import org.apache.kafka.common.requests.{AbstractRequestResponse, 
ResponseHeader}
 
 @nonthreadsafe
 private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends 
Send {
@@ -50,6 +51,13 @@ private[kafka] class BoundedByteBufferSend(val buffer: 
ByteBuffer) extends Send
     buffer.rewind()
   }
 
+  def this(header: ResponseHeader, body: AbstractRequestResponse) = {
+    this(header.sizeOf + body.sizeOf)
+    header.writeTo(buffer)
+    body.writeTo(buffer)
+    buffer.rewind
+  }
+
   
   def writeTo(channel: GatheringByteChannel): Int = {
     expectIncomplete()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 7b1db3d..bc73540 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -26,6 +26,7 @@ import kafka.common.TopicAndPartition
 import kafka.utils.{Logging, SystemTime}
 import kafka.message.ByteBufferMessageSet
 import java.net._
+import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader}
 import org.apache.log4j.Logger
 
 
@@ -47,7 +48,23 @@ object RequestChannel extends Logging {
     @volatile var responseCompleteTimeMs = -1L
     @volatile var responseDequeueTimeMs = -1L
     val requestId = buffer.getShort()
-    val requestObj: RequestOrResponse = 
RequestKeys.deserializerForKey(requestId)(buffer)
+    val requestObj =
+      if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId))
+        RequestKeys.deserializerForKey(requestId)(buffer)
+      else
+        null
+    val header: RequestHeader =
+      if (requestObj == null) {
+        buffer.rewind
+        RequestHeader.parse(buffer)
+      } else
+        null
+    val body: AbstractRequest =
+      if (requestObj == null)
+        AbstractRequest.getRequest(header.apiKey, buffer)
+      else
+        null
+
     buffer = null
     private val requestLogger = Logger.getLogger("kafka.request.logger")
     trace("Processor %d received request : %s".format(processor, requestObj))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c33e848..f372af7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,8 +17,7 @@
 
 package kafka.server
 
-import org.apache.kafka.common.requests.JoinGroupResponse
-import org.apache.kafka.common.requests.HeartbeatResponse
+import org.apache.kafka.common.requests.{JoinGroupResponse, JoinGroupRequest, 
HeartbeatRequest, HeartbeatResponse, ResponseHeader}
 import org.apache.kafka.common.TopicPartition
 
 import kafka.api._
@@ -74,7 +73,19 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     } catch {
       case e: Throwable =>
-        request.requestObj.handleError(e, requestChannel, request)
+        if ( request.requestObj != null)
+          request.requestObj.handleError(e, requestChannel, request)
+        else {
+          val response = request.body.getErrorResponse(e)
+          val respHeader = new ResponseHeader(request.header.correlationId)
+
+          /* If request doesn't have a default error response, we just close 
the connection.
+             For example, when produce request has acks set to 0 */
+          if (response == null)
+            requestChannel.closeConnection(request.processor, request)
+          else
+            requestChannel.sendResponse(new Response(request, new 
BoundedByteBufferSend(respHeader, response)))
+        }
         error("error when handling request %s".format(request.requestObj), e)
     } finally
       request.apiLocalCompleteTimeMs = SystemTime.milliseconds
@@ -484,40 +495,41 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleJoinGroupRequest(request: RequestChannel.Request) {
     import JavaConversions._
 
-    val joinGroupRequest = 
request.requestObj.asInstanceOf[JoinGroupRequestAndHeader]
+    val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
+    val respHeader = new ResponseHeader(request.header.correlationId)
 
     // the callback for sending a join-group response
     def sendResponseCallback(partitions: List[TopicAndPartition], 
generationId: Int, errorCode: Short) {
       val partitionList = partitions.map(tp => new TopicPartition(tp.topic, 
tp.partition)).toBuffer
-      val responseBody = new JoinGroupResponse(errorCode, generationId, 
joinGroupRequest.body.consumerId, partitionList)
-      val response = new 
JoinGroupResponseAndHeader(joinGroupRequest.correlationId, responseBody)
-      requestChannel.sendResponse(new RequestChannel.Response(request, new 
BoundedByteBufferSend(response)))
+      val responseBody = new JoinGroupResponse(errorCode, generationId, 
joinGroupRequest.consumerId, partitionList)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new 
BoundedByteBufferSend(respHeader, responseBody)))
     }
 
     // let the coordinator to handle join-group
     coordinator.consumerJoinGroup(
-      joinGroupRequest.body.groupId(),
-      joinGroupRequest.body.consumerId(),
-      joinGroupRequest.body.topics().toList,
-      joinGroupRequest.body.sessionTimeout(),
-      joinGroupRequest.body.strategy(),
+      joinGroupRequest.groupId(),
+      joinGroupRequest.consumerId(),
+      joinGroupRequest.topics().toList,
+      joinGroupRequest.sessionTimeout(),
+      joinGroupRequest.strategy(),
       sendResponseCallback)
   }
 
   def handleHeartbeatRequest(request: RequestChannel.Request) {
-    val heartbeatRequest = 
request.requestObj.asInstanceOf[HeartbeatRequestAndHeader]
+    val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest]
+    val respHeader = new ResponseHeader(request.header.correlationId)
 
     // the callback for sending a heartbeat response
     def sendResponseCallback(errorCode: Short) {
-      val response = new 
HeartbeatResponseAndHeader(heartbeatRequest.correlationId, new 
HeartbeatResponse(errorCode))
-      requestChannel.sendResponse(new RequestChannel.Response(request, new 
BoundedByteBufferSend(response)))
+      val response = new HeartbeatResponse(errorCode)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new 
BoundedByteBufferSend(respHeader, response)))
     }
 
     // let the coordinator to handle heartbeat
     coordinator.consumerHeartbeat(
-      heartbeatRequest.body.groupId(),
-      heartbeatRequest.body.consumerId(),
-      heartbeatRequest.body.groupGenerationId(),
+      heartbeatRequest.groupId(),
+      heartbeatRequest.consumerId(),
+      heartbeatRequest.groupGenerationId(),
       sendResponseCallback)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fe98ef/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 4cb803f..030faac 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -208,28 +208,6 @@ object SerializationTestUtils {
   def createConsumerMetadataResponse: ConsumerMetadataResponse = {
     ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError, 0)
   }
-
-  def createHeartbeatRequestAndHeader: HeartbeatRequestAndHeader = {
-    val body = new HeartbeatRequest("group1", 1, "consumer1")
-    HeartbeatRequestAndHeader(0.asInstanceOf[Short], 1, "", body)
-  }
-
-  def createHeartbeatResponseAndHeader: HeartbeatResponseAndHeader = {
-    val body = new HeartbeatResponse(0.asInstanceOf[Short])
-    HeartbeatResponseAndHeader(1, body)
-  }
-
-  def createJoinGroupRequestAndHeader: JoinGroupRequestAndHeader = {
-    import scala.collection.JavaConversions._
-    val body = new JoinGroupRequest("group1", 30000, List("topic1"), 
"consumer1", "strategy1");
-    JoinGroupRequestAndHeader(0.asInstanceOf[Short], 1, "", body)
-  }
-
-  def createJoinGroupResponseAndHeader: JoinGroupResponseAndHeader = {
-    import scala.collection.JavaConversions._
-    val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", 
List(new TopicPartition("test11", 1)))
-    JoinGroupResponseAndHeader(1, body)
-  }
 }
 
 class RequestResponseSerializationTest extends JUnitSuite {
@@ -253,10 +231,6 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val consumerMetadataRequest = 
SerializationTestUtils.createConsumerMetadataRequest
   private val consumerMetadataResponse = 
SerializationTestUtils.createConsumerMetadataResponse
   private val consumerMetadataResponseNoCoordinator = 
ConsumerMetadataResponse(None, 
ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
-  private val heartbeatRequest = 
SerializationTestUtils.createHeartbeatRequestAndHeader
-  private val heartbeatResponse = 
SerializationTestUtils.createHeartbeatResponseAndHeader
-  private val joinGroupRequest = 
SerializationTestUtils.createJoinGroupRequestAndHeader
-  private val joinGroupResponse = 
SerializationTestUtils.createJoinGroupResponseAndHeader
 
   @Test
   def testSerializationAndDeserialization() {
@@ -269,8 +243,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
                                offsetCommitRequestV0, offsetCommitRequestV1, 
offsetCommitRequestV2,
                                offsetCommitResponse, offsetFetchRequest, 
offsetFetchResponse,
                                consumerMetadataRequest, 
consumerMetadataResponse,
-                               consumerMetadataResponseNoCoordinator, 
heartbeatRequest,
-                               heartbeatResponse, joinGroupRequest, 
joinGroupResponse)
+                               consumerMetadataResponseNoCoordinator)
 
     requestsAndResponses.foreach { original =>
       val buffer = ByteBuffer.allocate(original.sizeInBytes)

Reply via email to