Repository: spark
Updated Branches:
  refs/heads/branch-0.9 763394951 -> 234a3786a


Spark 1916

The changes could be ported back to 0.9 as well.
Changing in.read to in.readFully to read the whole input stream rather than the 
first 1020 bytes.
This should ok considering that Flume caps the body size to 32K by default.

Author: David Lemieux <[email protected]>

Closes #865 from lemieud/SPARK-1916 and squashes the following commits:

a265673 [David Lemieux] Updated SparkFlumeEvent to read the whole stream rather 
than the first X bytes.
(cherry picked from commit 0b769b73fb7ae314325857138a2d3138ed157908)

Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/234a3786
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/234a3786
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/234a3786

Branch: refs/heads/branch-0.9
Commit: 234a3786ac8c4ece22083be67c340ac0e4484e01
Parents: 7633949
Author: David Lemieux <[email protected]>
Authored: Wed May 28 15:50:35 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Wed May 28 15:51:43 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/234a3786/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index ce3ef47..bda388e 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -61,7 +61,7 @@ class SparkFlumeEvent() extends Externalizable {
   def readExternal(in: ObjectInput) {
     val bodyLength = in.readInt()
     val bodyBuff = new Array[Byte](bodyLength)
-    in.read(bodyBuff)
+    in.readFully(bodyBuff)
 
     val numHeaders = in.readInt()
     val headers = new java.util.HashMap[CharSequence, CharSequence]

Reply via email to