Repository: spark Updated Branches: refs/heads/branch-1.0 86c4a79dc -> ac8c27bdf
[SPARK-1998] SparkFlumeEvent with body bigger than 1020 bytes are not re... flume event sent to Spark will fail if the body is too large and numHeaders is greater than zero Author: joyyoj <suns...@gmail.com> Closes #951 from joyyoj/master and squashes the following commits: f4660c5 [joyyoj] [SPARK-1998] SparkFlumeEvent with body bigger than 1020 bytes are not read properly (cherry picked from commit 29660443077619ee854025b8d0d3d64181724054) Signed-off-by: Patrick Wendell <pwend...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac8c27bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac8c27bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac8c27bd Branch: refs/heads/branch-1.0 Commit: ac8c27bdffc22d01afc049a64648237fdc607e66 Parents: 86c4a79 Author: joyyoj <suns...@gmail.com> Authored: Tue Jun 10 17:26:17 2014 -0700 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Tue Jun 10 17:26:31 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/streaming/flume/FlumeInputDStream.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ac8c27bd/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 5be33f1..ed35e34 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -71,12 +71,12 @@ class SparkFlumeEvent() extends Externalizable { for (i <- 0 until numHeaders) { val keyLength = in.readInt() val keyBuff = new Array[Byte](keyLength) - in.read(keyBuff) + in.readFully(keyBuff) val key : String = Utils.deserialize(keyBuff) val valLength = in.readInt() val valBuff = new Array[Byte](valLength) - in.read(valBuff) + in.readFully(valBuff) val value : String = Utils.deserialize(valBuff) headers.put(key, value)