Updated Branches:
  refs/heads/master fb6875dd5 -> 743a31a7c

add http timeout for httpbroadcast


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/db998a6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/db998a6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/db998a6e

Branch: refs/heads/master
Commit: db998a6e14389768f93b1fdd6be7847d5f7604fd
Parents: 18d6df0
Author: haitao.yao <[email protected]>
Authored: Tue Nov 26 18:23:48 2013 +0800
Committer: haitao.yao <[email protected]>
Committed: Tue Nov 26 18:23:48 2013 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/broadcast/HttpBroadcast.scala  | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/db998a6e/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 609464e..47db720 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -19,6 +19,7 @@ package org.apache.spark.broadcast
 
 import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream}
 import java.net.URL
+import java.util.concurrent.TimeUnit
 
 import it.unimi.dsi.fastutil.io.FastBufferedInputStream
 import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
@@ -83,6 +84,8 @@ private object HttpBroadcast extends Logging {
   private val files = new TimeStampedHashSet[String]
   private val cleaner = new 
MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup)
 
+  private val httpReadTimeout = 
TimeUnit.MILLISECONDS.convert(5,TimeUnit.MINUTES).toInt
+
   private lazy val compressionCodec = CompressionCodec.createCodec()
 
   def initialize(isDriver: Boolean) {
@@ -138,10 +141,13 @@ private object HttpBroadcast extends Logging {
   def read[T](id: Long): T = {
     val url = serverUri + "/" + BroadcastBlockId(id).name
     val in = {
+      val httpConnection = new URL(url).openConnection()
+      httpConnection.setReadTimeout(httpReadTimeout)
+      val inputStream = httpConnection.getInputStream()
       if (compress) {
-        compressionCodec.compressedInputStream(new URL(url).openStream())
+        compressionCodec.compressedInputStream(inputStream)
       } else {
-        new FastBufferedInputStream(new URL(url).openStream(), bufferSize)
+        new FastBufferedInputStream(inputStream, bufferSize)
       }
     }
     val ser = SparkEnv.get.serializer.newInstance()

Reply via email to