KAFKA-870 hadoop-producer KafkaRecordWriter writes entire input buffer capacity, even when intended payload is smaller; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1976ce80 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1976ce80 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1976ce80 Branch: refs/heads/trunk Commit: 1976ce8069818c2c6d093b5b75e8f7b6653120bc Parents: f1d2141 Author: David Stein <[email protected]> Authored: Mon Apr 22 19:19:13 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Mon Apr 22 19:20:01 2013 -0700 ---------------------------------------------------------------------- .../src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1976ce80/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java index a381ccd..6eea635 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java @@ -17,6 +17,7 @@ package kafka.bridge.hadoop; import java.io.IOException; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import kafka.javaapi.producer.Producer; @@ -62,7 +63,9 @@ public class KafkaRecordWriter<K,V> extends RecordWriter<K,V> if (value instanceof byte[]) valBytes = (byte[]) value; else if (value instanceof BytesWritable) - valBytes = ((BytesWritable) value).getBytes(); + // BytesWritable.getBytes returns its internal buffer, so .length would refer to its capacity, not the + // intended size of the byte array contained. We need to use BytesWritable.getLength for the true size. + valBytes = Arrays.copyOf(((BytesWritable) value).getBytes(), ((BytesWritable) value).getLength()); else throw new IllegalArgumentException("KafkaRecordWriter expects byte array value to publish");
