Repository: spark
Updated Branches:
  refs/heads/branch-1.0 86c4a79dc -> ac8c27bdf


[SPARK-1998] SparkFlumeEvent with body bigger than 1020 bytes are not re...

flume event sent to Spark will fail if the body is too large and numHeaders is 
greater than zero

Author: joyyoj <suns...@gmail.com>

Closes #951 from joyyoj/master and squashes the following commits:

f4660c5 [joyyoj] [SPARK-1998] SparkFlumeEvent with body bigger than 1020 bytes 
are not read properly
(cherry picked from commit 29660443077619ee854025b8d0d3d64181724054)

Signed-off-by: Patrick Wendell <pwend...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac8c27bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac8c27bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac8c27bd

Branch: refs/heads/branch-1.0
Commit: ac8c27bdffc22d01afc049a64648237fdc607e66
Parents: 86c4a79
Author: joyyoj <suns...@gmail.com>
Authored: Tue Jun 10 17:26:17 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Tue Jun 10 17:26:31 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/flume/FlumeInputDStream.scala     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ac8c27bd/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 5be33f1..ed35e34 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -71,12 +71,12 @@ class SparkFlumeEvent() extends Externalizable {
     for (i <- 0 until numHeaders) {
       val keyLength = in.readInt()
       val keyBuff = new Array[Byte](keyLength)
-      in.read(keyBuff)
+      in.readFully(keyBuff)
       val key : String = Utils.deserialize(keyBuff)
 
       val valLength = in.readInt()
       val valBuff = new Array[Byte](valLength)
-      in.read(valBuff)
+      in.readFully(valBuff)
       val value : String = Utils.deserialize(valBuff)
 
       headers.put(key, value)

Reply via email to