KAFKA-870 hadoop-producer KafkaRecordWriter writes entire input buffer 
capacity, even when intended payload is smaller; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1976ce80
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1976ce80
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1976ce80

Branch: refs/heads/trunk
Commit: 1976ce8069818c2c6d093b5b75e8f7b6653120bc
Parents: f1d2141
Author: David Stein <[email protected]>
Authored: Mon Apr 22 19:19:13 2013 -0700
Committer: Neha Narkhede <[email protected]>
Committed: Mon Apr 22 19:20:01 2013 -0700

----------------------------------------------------------------------
 .../src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1976ce80/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
 
b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
index a381ccd..6eea635 100644
--- 
a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
+++ 
b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
@@ -17,6 +17,7 @@
 package kafka.bridge.hadoop;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import kafka.javaapi.producer.Producer;
@@ -62,7 +63,9 @@ public class KafkaRecordWriter<K,V> extends RecordWriter<K,V>
     if (value instanceof byte[])
       valBytes = (byte[]) value;
     else if (value instanceof BytesWritable)
-      valBytes = ((BytesWritable) value).getBytes();
+      // BytesWritable.getBytes returns its internal buffer, so .length would 
refer to its capacity, not the
+      // intended size of the byte array contained.  We need to use 
BytesWritable.getLength for the true size.
+      valBytes = Arrays.copyOf(((BytesWritable) value).getBytes(), 
((BytesWritable) value).getLength());
     else
       throw new IllegalArgumentException("KafkaRecordWriter expects byte array 
value to publish");
 

Reply via email to