spark git commit: [SPARK-12376][TESTS] Spark Streaming Java8APISuite fails in assertOrderInvariantEquals method

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 48dcee484 -> 4df1dd403


[SPARK-12376][TESTS] Spark Streaming Java8APISuite fails in 
assertOrderInvariantEquals method

org.apache.spark.streaming.Java8APISuite.java is failing due to trying to sort 
immutable list in assertOrderInvariantEquals method.

Author: Evan Chen 

Closes #10336 from evanyc15/SPARK-12376-StreamingJavaAPISuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4df1dd40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4df1dd40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4df1dd40

Branch: refs/heads/branch-1.6
Commit: 4df1dd403441a4e4ca056d294385d8d0d8a0c65d
Parents: 48dcee4
Author: Evan Chen 
Authored: Thu Dec 17 14:22:30 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 17 14:23:45 2015 -0800

--
 .../java/org/apache/spark/streaming/Java8APISuite.java   | 11 ---
 1 file changed, 8 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4df1dd40/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
--
diff --git 
a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
 
b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index 89e0c7f..e8a0dfc 100644
--- 
a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ 
b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -439,9 +439,14 @@ public class Java8APISuite extends 
LocalJavaStreamingContext implements Serializ
*/
   public static > void assertOrderInvariantEquals(
 List expected, List actual) {
-expected.forEach((List list) -> Collections.sort(list));
-actual.forEach((List list) -> Collections.sort(list));
-Assert.assertEquals(expected, actual);
+expected.forEach(list -> Collections.sort(list));
+List sortedActual = new ArrayList<>();
+actual.forEach(list -> {
+List sortedList = new ArrayList<>(list);
+Collections.sort(sortedList);
+sortedActual.add(sortedList);
+});
+Assert.assertEquals(expected, sortedActual);
   }
 
   @Test


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 818456881 -> 540b5aead


[SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split

String.split accepts a regular expression, so we should escape "." and "|".

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10361 from zsxwing/reg-bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/540b5aea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/540b5aea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/540b5aea

Branch: refs/heads/master
Commit: 540b5aeadc84d1a5d61bda4414abd6bf35dc7ff9
Parents: 8184568
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Dec 17 13:23:48 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 17 13:23:48 2015 -0800

--
 .../src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala | 2 +-
 .../org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/540b5aea/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala 
b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
index 3ae53e5..02ed746 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
@@ -50,7 +50,7 @@ object MovieLensALS {
 def parseMovie(str: String): Movie = {
   val fields = str.split("::")
   assert(fields.size == 3)
-  Movie(fields(0).toInt, fields(1), fields(2).split("|"))
+  Movie(fields(0).toInt, fields(1), fields(2).split("\\|"))
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/540b5aea/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index a99b570..b946e0d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -253,7 +253,7 @@ private[streaming] object FileBasedWriteAheadLog {
 
   def getCallerName(): Option[String] = {
 val stackTraceClasses = 
Thread.currentThread.getStackTrace().map(_.getClassName)
-
stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption)
+
stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split("\\.").lastOption)
   }
 
   /** Convert a sequence of files to a sequence of sorted LogInfo objects */


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 88bbb5429 -> c0ab14fbe


[SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split

String.split accepts a regular expression, so we should escape "." and "|".

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10361 from zsxwing/reg-bug.

(cherry picked from commit 540b5aeadc84d1a5d61bda4414abd6bf35dc7ff9)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0ab14fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0ab14fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0ab14fb

Branch: refs/heads/branch-1.6
Commit: c0ab14fbeab2a81d174c3643a4fcc915ff2902e8
Parents: 88bbb54
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Dec 17 13:23:48 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 17 13:23:58 2015 -0800

--
 .../src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala | 2 +-
 .../org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c0ab14fb/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala 
b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
index 3ae53e5..02ed746 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
@@ -50,7 +50,7 @@ object MovieLensALS {
 def parseMovie(str: String): Movie = {
   val fields = str.split("::")
   assert(fields.size == 3)
-  Movie(fields(0).toInt, fields(1), fields(2).split("|"))
+  Movie(fields(0).toInt, fields(1), fields(2).split("\\|"))
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c0ab14fb/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index a99b570..b946e0d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -253,7 +253,7 @@ private[streaming] object FileBasedWriteAheadLog {
 
   def getCallerName(): Option[String] = {
 val stackTraceClasses = 
Thread.currentThread.getStackTrace().map(_.getClassName)
-
stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption)
+
stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split("\\.").lastOption)
   }
 
   /** Convert a sequence of files to a sequence of sorted LogInfo objects */


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11749][STREAMING] Duplicate creating the RDD in file stream when recovering from checkpoint data

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 658f66e62 -> f4346f612


[SPARK-11749][STREAMING] Duplicate creating the RDD in file stream when 
recovering from checkpoint data

Add a transient flag `DStream.restoredFromCheckpointData` to control the 
restore processing in DStream to avoid duplicate works:  check this flag first 
in `DStream.restoreCheckpointData`, only when `false`, the restore process will 
be executed.

Author: jhu-chang 

Closes #9765 from jhu-chang/SPARK-11749.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4346f61
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4346f61
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4346f61

Branch: refs/heads/master
Commit: f4346f612b6798517153a786f9172cf41618d34d
Parents: 658f66e
Author: jhu-chang 
Authored: Thu Dec 17 17:53:15 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 17 17:53:15 2015 -0800

--
 .../spark/streaming/dstream/DStream.scala   | 15 --
 .../spark/streaming/CheckpointSuite.scala   | 56 ++--
 2 files changed, 62 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f4346f61/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 1a6edf9..91a43e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -97,6 +97,8 @@ abstract class DStream[T: ClassTag] (
   private[streaming] val mustCheckpoint = false
   private[streaming] var checkpointDuration: Duration = null
   private[streaming] val checkpointData = new DStreamCheckpointData(this)
+  @transient
+  private var restoredFromCheckpointData = false
 
   // Reference to whole DStream graph
   private[streaming] var graph: DStreamGraph = null
@@ -507,11 +509,14 @@ abstract class DStream[T: ClassTag] (
* override the updateCheckpointData() method would also need to override 
this method.
*/
   private[streaming] def restoreCheckpointData() {
-// Create RDDs from the checkpoint data
-logInfo("Restoring checkpoint data")
-checkpointData.restore()
-dependencies.foreach(_.restoreCheckpointData())
-logInfo("Restored checkpoint data")
+if (!restoredFromCheckpointData) {
+  // Create RDDs from the checkpoint data
+  logInfo("Restoring checkpoint data")
+  checkpointData.restore()
+  dependencies.foreach(_.restoreCheckpointData())
+  restoredFromCheckpointData = true
+  logInfo("Restored checkpoint data")
+}
   }
 
   @throws(classOf[IOException])

http://git-wip-us.apache.org/repos/asf/spark/blob/f4346f61/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index cd28d3c..f5f446f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming
 
-import java.io.{ObjectOutputStream, ByteArrayOutputStream, 
ByteArrayInputStream, File}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, 
ObjectOutputStream}
 
 import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 import scala.reflect.ClassTag
@@ -34,9 +34,30 @@ import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, TestUtils}
-import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils}
+import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, Utils}
+
+/**
+ * A input stream that records the times of restore() invoked
+ */
+private[streaming]
+class CheckpointInputDStream(ssc_ : StreamingContext) extends 
InputDStream[Int](ssc_) {
+  protected[streaming] override val checkpointData = new 
FileInputDStreamCheckpointData
+  override def start(): Unit = { }
+  override def stop(): Unit = { }
+  override def compute(time: Time): Option[RDD[Int]] = 
Some(ssc.sc.makeRDD(Seq(1)))
+  private[streaming]

spark git commit: [SPARK-11749][STREAMING] Duplicate creating the RDD in file stream when recovering from checkpoint data

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 4df1dd403 -> 9177ea383


[SPARK-11749][STREAMING] Duplicate creating the RDD in file stream when 
recovering from checkpoint data

Add a transient flag `DStream.restoredFromCheckpointData` to control the 
restore processing in DStream to avoid duplicate works:  check this flag first 
in `DStream.restoreCheckpointData`, only when `false`, the restore process will 
be executed.

Author: jhu-chang 

Closes #9765 from jhu-chang/SPARK-11749.

(cherry picked from commit f4346f612b6798517153a786f9172cf41618d34d)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9177ea38
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9177ea38
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9177ea38

Branch: refs/heads/branch-1.6
Commit: 9177ea383a29653f0591a59e1ee2dff6b87d5a1c
Parents: 4df1dd4
Author: jhu-chang 
Authored: Thu Dec 17 17:53:15 2015 -0800
Committer: Shixiong Zhu 
Committed: Thu Dec 17 17:54:14 2015 -0800

--
 .../spark/streaming/dstream/DStream.scala   | 15 --
 .../spark/streaming/CheckpointSuite.scala   | 56 ++--
 2 files changed, 62 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9177ea38/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 1a6edf9..91a43e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -97,6 +97,8 @@ abstract class DStream[T: ClassTag] (
   private[streaming] val mustCheckpoint = false
   private[streaming] var checkpointDuration: Duration = null
   private[streaming] val checkpointData = new DStreamCheckpointData(this)
+  @transient
+  private var restoredFromCheckpointData = false
 
   // Reference to whole DStream graph
   private[streaming] var graph: DStreamGraph = null
@@ -507,11 +509,14 @@ abstract class DStream[T: ClassTag] (
* override the updateCheckpointData() method would also need to override 
this method.
*/
   private[streaming] def restoreCheckpointData() {
-// Create RDDs from the checkpoint data
-logInfo("Restoring checkpoint data")
-checkpointData.restore()
-dependencies.foreach(_.restoreCheckpointData())
-logInfo("Restored checkpoint data")
+if (!restoredFromCheckpointData) {
+  // Create RDDs from the checkpoint data
+  logInfo("Restoring checkpoint data")
+  checkpointData.restore()
+  dependencies.foreach(_.restoreCheckpointData())
+  restoredFromCheckpointData = true
+  logInfo("Restored checkpoint data")
+}
   }
 
   @throws(classOf[IOException])

http://git-wip-us.apache.org/repos/asf/spark/blob/9177ea38/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index cd28d3c..f5f446f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming
 
-import java.io.{ObjectOutputStream, ByteArrayOutputStream, 
ByteArrayInputStream, File}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, 
ObjectOutputStream}
 
 import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 import scala.reflect.ClassTag
@@ -34,9 +34,30 @@ import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, TestUtils}
-import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils}
+import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, Utils}
+
+/**
+ * A input stream that records the times of restore() invoked
+ */
+private[streaming]
+class CheckpointInputDStream(ssc_ : StreamingContext) extends 
InputDStream[Int](ssc_) {
+  protected[streaming] override val checkpointData = new 
FileInputDStreamCheckpointData
+  override def start(): Unit = { }
+  

spark git commit: [MINOR] Hide the error logs for 'SQLListenerMemoryLeakSuite'

2015-12-17 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master f4346f612 -> 0370abdfd


[MINOR] Hide the error logs for 'SQLListenerMemoryLeakSuite'

Hide the error logs for 'SQLListenerMemoryLeakSuite' to avoid noises. Most of 
changes are space changes.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10363 from zsxwing/hide-log.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0370abdf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0370abdf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0370abdf

Branch: refs/heads/master
Commit: 0370abdfd636566cd8df954c6f9ea5a794d275ef
Parents: f4346f6
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Dec 17 18:18:12 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 17 18:18:12 2015 -0800

--
 .../sql/execution/ui/SQLListenerSuite.scala | 64 +++-
 1 file changed, 35 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0370abdf/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 12a4e13..11a6ce9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -336,39 +336,45 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
 class SQLListenerMemoryLeakSuite extends SparkFunSuite {
 
   test("no memory leak") {
-val conf = new SparkConf()
-  .setMaster("local")
-  .setAppName("test")
-  .set("spark.task.maxFailures", "1") // Don't retry the tasks to run this 
test quickly
-  .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run 
this test quickly
-val sc = new SparkContext(conf)
+val oldLogLevel = org.apache.log4j.Logger.getRootLogger().getLevel()
 try {
-  SQLContext.clearSqlListener()
-  val sqlContext = new SQLContext(sc)
-  import sqlContext.implicits._
-  // Run 100 successful executions and 100 failed executions.
-  // Each execution only has one job and one stage.
-  for (i <- 0 until 100) {
-val df = Seq(
-  (1, 1),
-  (2, 2)
-).toDF()
-df.collect()
-try {
-  df.foreach(_ => throw new RuntimeException("Oops"))
-} catch {
-  case e: SparkException => // This is expected for a failed job
+  
org.apache.log4j.Logger.getRootLogger().setLevel(org.apache.log4j.Level.FATAL)
+  val conf = new SparkConf()
+.setMaster("local")
+.setAppName("test")
+.set("spark.task.maxFailures", "1") // Don't retry the tasks to run 
this test quickly
+.set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run 
this test quickly
+  val sc = new SparkContext(conf)
+  try {
+SQLContext.clearSqlListener()
+val sqlContext = new SQLContext(sc)
+import sqlContext.implicits._
+// Run 100 successful executions and 100 failed executions.
+// Each execution only has one job and one stage.
+for (i <- 0 until 100) {
+  val df = Seq(
+(1, 1),
+(2, 2)
+  ).toDF()
+  df.collect()
+  try {
+df.foreach(_ => throw new RuntimeException("Oops"))
+  } catch {
+case e: SparkException => // This is expected for a failed job
+  }
 }
+sc.listenerBus.waitUntilEmpty(1)
+assert(sqlContext.listener.getCompletedExecutions.size <= 50)
+assert(sqlContext.listener.getFailedExecutions.size <= 50)
+// 50 for successful executions and 50 for failed executions
+assert(sqlContext.listener.executionIdToData.size <= 100)
+assert(sqlContext.listener.jobIdToExecutionId.size <= 100)
+assert(sqlContext.listener.stageIdToStageMetrics.size <= 100)
+  } finally {
+sc.stop()
   }
-  sc.listenerBus.waitUntilEmpty(1)
-  assert(sqlContext.listener.getCompletedExecutions.size <= 50)
-  assert(sqlContext.listener.getFailedExecutions.size <= 50)
-  // 50 for successful executions and 50 for failed executions
-  assert(sqlContext.listener.executionIdToData.size <= 100)
-  assert(sqlContex

spark git commit: [MINOR] Add missing interpolation in NettyRPCEnv

2015-12-16 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 552b38f87 -> 638b89bc3


[MINOR] Add missing interpolation in NettyRPCEnv

```
Exception in thread "main" org.apache.spark.rpc.RpcTimeoutException:
Cannot receive any reply in ${timeout.duration}. This timeout is controlled by 
spark.rpc.askTimeout
at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
```

Author: Andrew Or 

Closes #10334 from andrewor14/rpc-typo.

(cherry picked from commit 861549acdbc11920cde51fc57752a8bc241064e5)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/638b89bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/638b89bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/638b89bc

Branch: refs/heads/branch-1.6
Commit: 638b89bc3b1c421fe11cbaf52649225662d3d3ce
Parents: 552b38f
Author: Andrew Or 
Authored: Wed Dec 16 16:13:48 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 16 16:13:55 2015 -0800

--
 core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/638b89bc/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 9d353bb..a53bc5e 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -239,7 +239,7 @@ private[netty] class NettyRpcEnv(
 val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
   override def run(): Unit = {
 promise.tryFailure(
-  new TimeoutException("Cannot receive any reply in 
${timeout.duration}"))
+  new TimeoutException(s"Cannot receive any reply in 
${timeout.duration}"))
   }
 }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
 promise.future.onComplete { v =>


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR] Add missing interpolation in NettyRPCEnv

2015-12-16 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 27b98e99d -> 861549acd


[MINOR] Add missing interpolation in NettyRPCEnv

```
Exception in thread "main" org.apache.spark.rpc.RpcTimeoutException:
Cannot receive any reply in ${timeout.duration}. This timeout is controlled by 
spark.rpc.askTimeout
at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
```

Author: Andrew Or 

Closes #10334 from andrewor14/rpc-typo.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/861549ac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/861549ac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/861549ac

Branch: refs/heads/master
Commit: 861549acdbc11920cde51fc57752a8bc241064e5
Parents: 27b98e9
Author: Andrew Or 
Authored: Wed Dec 16 16:13:48 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 16 16:13:48 2015 -0800

--
 core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/861549ac/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index f82fd4e..de3db6b 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -232,7 +232,7 @@ private[netty] class NettyRpcEnv(
 val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
   override def run(): Unit = {
 promise.tryFailure(
-  new TimeoutException("Cannot receive any reply in 
${timeout.duration}"))
+  new TimeoutException(s"Cannot receive any reply in 
${timeout.duration}"))
   }
 }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
 promise.future.onComplete { v =>


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11904][PYSPARK] reduceByKeyAndWindow does not require checkpointing when invFunc is None

2015-12-16 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 97678edea -> 437583f69


[SPARK-11904][PYSPARK] reduceByKeyAndWindow does not require checkpointing when 
invFunc is None

when invFunc is None, `reduceByKeyAndWindow(func, None, winsize, slidesize)` is 
equivalent to

 reduceByKey(func).window(winsize, slidesize).reduceByKey(winsize, 
slidesize)

and no checkpoint is necessary. The corresponding Scala code does exactly that, 
but Python code always creates a windowed stream with obligatory checkpointing. 
The patch fixes this.

I do not know how to unit-test this.

Author: David Tolpin 

Closes #9888 from dtolpin/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/437583f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/437583f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/437583f6

Branch: refs/heads/master
Commit: 437583f692e30b8dc03b339a34e92595d7b992ba
Parents: 97678ed
Author: David Tolpin 
Authored: Wed Dec 16 22:10:24 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 16 22:10:24 2015 -0800

--
 python/pyspark/streaming/dstream.py | 45 
 1 file changed, 23 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/437583f6/python/pyspark/streaming/dstream.py
--
diff --git a/python/pyspark/streaming/dstream.py 
b/python/pyspark/streaming/dstream.py
index f61137c..b994a53 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -542,31 +542,32 @@ class DStream(object):
 
 reduced = self.reduceByKey(func, numPartitions)
 
-def reduceFunc(t, a, b):
-b = b.reduceByKey(func, numPartitions)
-r = a.union(b).reduceByKey(func, numPartitions) if a else b
-if filterFunc:
-r = r.filter(filterFunc)
-return r
-
-def invReduceFunc(t, a, b):
-b = b.reduceByKey(func, numPartitions)
-joined = a.leftOuterJoin(b, numPartitions)
-return joined.mapValues(lambda kv: invFunc(kv[0], kv[1])
-if kv[1] is not None else kv[0])
-
-jreduceFunc = TransformFunction(self._sc, reduceFunc, 
reduced._jrdd_deserializer)
 if invFunc:
+def reduceFunc(t, a, b):
+b = b.reduceByKey(func, numPartitions)
+r = a.union(b).reduceByKey(func, numPartitions) if a else b
+if filterFunc:
+r = r.filter(filterFunc)
+return r
+
+def invReduceFunc(t, a, b):
+b = b.reduceByKey(func, numPartitions)
+joined = a.leftOuterJoin(b, numPartitions)
+return joined.mapValues(lambda kv: invFunc(kv[0], kv[1])
+if kv[1] is not None else kv[0])
+
+jreduceFunc = TransformFunction(self._sc, reduceFunc, 
reduced._jrdd_deserializer)
 jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, 
reduced._jrdd_deserializer)
+if slideDuration is None:
+slideDuration = self._slideDuration
+dstream = self._sc._jvm.PythonReducedWindowedDStream(
+reduced._jdstream.dstream(),
+jreduceFunc, jinvReduceFunc,
+self._ssc._jduration(windowDuration),
+self._ssc._jduration(slideDuration))
+return DStream(dstream.asJavaDStream(), self._ssc, 
self._sc.serializer)
 else:
-jinvReduceFunc = None
-if slideDuration is None:
-slideDuration = self._slideDuration
-dstream = 
self._sc._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(),
- jreduceFunc, 
jinvReduceFunc,
- 
self._ssc._jduration(windowDuration),
- 
self._ssc._jduration(slideDuration))
-return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
+return reduced.window(windowDuration, 
slideDuration).reduceByKey(func, numPartitions)
 
 def updateStateByKey(self, updateFunc, numPartitions=None, 
initialRDD=None):
 """


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [STREAMING][MINOR] Fix typo in function name of StateImpl

2015-12-15 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master c59df8c51 -> bc1ff9f4a


[STREAMING][MINOR] Fix typo in function name of StateImpl

cc\ tdas zsxwing , please review. Thanks a lot.

Author: jerryshao <ss...@hortonworks.com>

Closes #10305 from jerryshao/fix-typo-state-impl.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc1ff9f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc1ff9f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc1ff9f4

Branch: refs/heads/master
Commit: bc1ff9f4a41401599d3a87fb3c23a2078228a29b
Parents: c59df8c
Author: jerryshao <ss...@hortonworks.com>
Authored: Tue Dec 15 09:41:40 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Dec 15 09:41:40 2015 -0800

--
 streaming/src/main/scala/org/apache/spark/streaming/State.scala| 2 +-
 .../scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala | 2 +-
 .../test/scala/org/apache/spark/streaming/MapWithStateSuite.scala  | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bc1ff9f4/streaming/src/main/scala/org/apache/spark/streaming/State.scala
--
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/State.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/State.scala
index b47bdda..42424d6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/State.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/State.scala
@@ -206,7 +206,7 @@ private[streaming] class StateImpl[S] extends State[S] {
* Update the internal data and flags in `this` to the given state that is 
going to be timed out.
* This method allows `this` object to be reused across many state records.
*/
-  def wrapTiminoutState(newState: S): Unit = {
+  def wrapTimingOutState(newState: S): Unit = {
 this.state = newState
 defined = true
 timingOut = true

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1ff9f4/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
index ed95171..fdf6167 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
@@ -67,7 +67,7 @@ private[streaming] object MapWithStateRDDRecord {
 // data returned
 if (removeTimedoutData && timeoutThresholdTime.isDefined) {
   newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, 
state, _) =>
-wrappedState.wrapTiminoutState(state)
+wrappedState.wrapTimingOutState(state)
 val returned = mappingFunction(batchTime, key, None, wrappedState)
 mappedData ++= returned
 newStateMap.remove(key)

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1ff9f4/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
index 4b08085..6b21433 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
@@ -125,7 +125,7 @@ class MapWithStateSuite extends SparkFunSuite
 state.remove()
 testState(None, shouldBeRemoved = true)
 
-state.wrapTiminoutState(3)
+state.wrapTimingOutState(3)
 testState(Some(3), shouldBeTimingOut = true)
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [STREAMING][MINOR] Fix typo in function name of StateImpl

2015-12-15 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 352a0c80f -> 23c884605


[STREAMING][MINOR] Fix typo in function name of StateImpl

cc\ tdas zsxwing , please review. Thanks a lot.

Author: jerryshao <ss...@hortonworks.com>

Closes #10305 from jerryshao/fix-typo-state-impl.

(cherry picked from commit bc1ff9f4a41401599d3a87fb3c23a2078228a29b)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23c88460
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23c88460
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23c88460

Branch: refs/heads/branch-1.6
Commit: 23c8846050b307fdfe2307f7e7ca9d0f69f969a9
Parents: 352a0c8
Author: jerryshao <ss...@hortonworks.com>
Authored: Tue Dec 15 09:41:40 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Dec 15 09:41:50 2015 -0800

--
 streaming/src/main/scala/org/apache/spark/streaming/State.scala| 2 +-
 .../scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala | 2 +-
 .../test/scala/org/apache/spark/streaming/MapWithStateSuite.scala  | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/23c88460/streaming/src/main/scala/org/apache/spark/streaming/State.scala
--
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/State.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/State.scala
index b47bdda..42424d6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/State.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/State.scala
@@ -206,7 +206,7 @@ private[streaming] class StateImpl[S] extends State[S] {
* Update the internal data and flags in `this` to the given state that is 
going to be timed out.
* This method allows `this` object to be reused across many state records.
*/
-  def wrapTiminoutState(newState: S): Unit = {
+  def wrapTimingOutState(newState: S): Unit = {
 this.state = newState
 defined = true
 timingOut = true

http://git-wip-us.apache.org/repos/asf/spark/blob/23c88460/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
index ed95171..fdf6167 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
@@ -67,7 +67,7 @@ private[streaming] object MapWithStateRDDRecord {
 // data returned
 if (removeTimedoutData && timeoutThresholdTime.isDefined) {
   newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, 
state, _) =>
-wrappedState.wrapTiminoutState(state)
+wrappedState.wrapTimingOutState(state)
 val returned = mappingFunction(batchTime, key, None, wrappedState)
 mappedData ++= returned
 newStateMap.remove(key)

http://git-wip-us.apache.org/repos/asf/spark/blob/23c88460/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
index 4b08085..6b21433 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
@@ -125,7 +125,7 @@ class MapWithStateSuite extends SparkFunSuite
 state.remove()
 testState(None, shouldBeRemoved = true)
 
-state.wrapTiminoutState(3)
+state.wrapTimingOutState(3)
 testState(Some(3), shouldBeTimingOut = true)
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12304][STREAMING] Make Spark Streaming web UI display more fri…

2015-12-15 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master ca0690b5e -> d52bf47e1


[SPARK-12304][STREAMING] Make Spark Streaming web UI display more fri…

…endly Receiver graphs

Currently, the Spark Streaming web UI uses the same maxY when displays 'Input 
Rate Times& Histograms' and 'Per-Receiver Times& Histograms'.

This may lead to somewhat un-friendly graphs: once we have tens of Receivers or 
more, every 'Per-Receiver Times' line almost hits the ground.

This issue proposes to calculate a new maxY against the original one, which is 
shared among all the `Per-Receiver Times& Histograms' graphs.

Before:
![before-5](https://cloud.githubusercontent.com/assets/15843379/11761362/d790c356-a0fa-11e5-860e-4b834603de1d.png)

After:
![after-5](https://cloud.githubusercontent.com/assets/15843379/11761361/cfabf692-a0fa-11e5-97d0-4ad124aaca2a.png)

Author: proflin 

Closes #10318 from proflin/SPARK-12304.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d52bf47e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d52bf47e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d52bf47e

Branch: refs/heads/master
Commit: d52bf47e13e0186590437f71040100d2f6f11da9
Parents: ca0690b
Author: proflin 
Authored: Tue Dec 15 20:22:56 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 15 20:22:56 2015 -0800

--
 .../scala/org/apache/spark/streaming/ui/StreamingPage.scala  | 8 +++-
 1 file changed, 7 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d52bf47e/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 88a4483..b3692c3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -392,9 +392,15 @@ private[ui] class StreamingPage(parent: StreamingTab)
   maxX: Long,
   minY: Double,
   maxY: Double): Seq[Node] = {
+val maxYCalculated = listener.receivedEventRateWithBatchTime.values
+  .flatMap { case streamAndRates => streamAndRates.map { case (_, 
eventRate) => eventRate } }
+  .reduceOption[Double](math.max)
+  .map(_.ceil.toLong)
+  .getOrElse(0L)
+
 val content = 
listener.receivedEventRateWithBatchTime.toList.sortBy(_._1).map {
   case (streamId, eventRates) =>
-generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, 
minY, maxY)
+generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, 
minY, maxYCalculated)
 }.foldLeft[Seq[Node]](Nil)(_ ++ _)
 
 // scalastyle:off


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12281][CORE] Fix a race condition when reporting ExecutorState in the shutdown hook

2015-12-13 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 d7e3bfd7d -> fbf16da2e


[SPARK-12281][CORE] Fix a race condition when reporting ExecutorState in the 
shutdown hook

1. Make sure workers and masters exit so that no worker or master will still be 
running when triggering the shutdown hook.
2. Set ExecutorState to FAILED if it's still RUNNING when executing the 
shutdown hook.

This should fix the potential exceptions when exiting a local cluster
```
java.lang.AssertionError: assertion failed: executor 4 state transfer from 
RUNNING to RUNNING is illegal
at scala.Predef$.assert(Predef.scala:179)
at 
org.apache.spark.deploy.master.Master$$anonfun$receive$1.applyOrElse(Master.scala:260)
at 
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at 
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

java.lang.IllegalStateException: Shutdown hooks cannot be modified during 
shutdown.
at 
org.apache.spark.util.SparkShutdownHookManager.add(ShutdownHookManager.scala:246)
at 
org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:191)
at 
org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:180)
at 
org.apache.spark.deploy.worker.ExecutorRunner.start(ExecutorRunner.scala:73)
at 
org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:474)
at 
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at 
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10269 from zsxwing/executor-state.

(cherry picked from commit 2aecda284e22ec608992b6221e2f5ffbd51fcd24)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbf16da2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbf16da2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbf16da2

Branch: refs/heads/branch-1.6
Commit: fbf16da2e53acc8678bd1454b0749d1923d4eddf
Parents: d7e3bfd
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Sun Dec 13 22:06:39 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Sun Dec 13 22:06:56 2015 -0800

--
 .../main/scala/org/apache/spark/deploy/LocalSparkCluster.scala  | 2 ++
 core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 5 ++---
 .../scala/org/apache/spark/deploy/worker/ExecutorRunner.scala   | 5 +
 3 files changed, 9 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fbf16da2/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala 
b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 83ccaad..5bb62d3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -75,6 +75,8 @@ class LocalSparkCluster(
 // Stop the workers before the master so they don't get upset that it 
disconnected
 workerRpcEnvs.foreach(_.shutdown())
 masterRpcEnvs.foreach(_.shutdown())
+workerRpcEnvs.foreach(_.awaitTermination())
+masterRpcEnvs.foreach(_.awaitTermination())
 masterRpcEnvs.clear()
 workerRpcEnvs.clear()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/fbf16da2/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 04b20e0..1355e1a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ma

spark git commit: [SPARK-12281][CORE] Fix a race condition when reporting ExecutorState in the shutdown hook

2015-12-13 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 8af2f8c61 -> 2aecda284


[SPARK-12281][CORE] Fix a race condition when reporting ExecutorState in the 
shutdown hook

1. Make sure workers and masters exit so that no worker or master will still be 
running when triggering the shutdown hook.
2. Set ExecutorState to FAILED if it's still RUNNING when executing the 
shutdown hook.

This should fix the potential exceptions when exiting a local cluster
```
java.lang.AssertionError: assertion failed: executor 4 state transfer from 
RUNNING to RUNNING is illegal
at scala.Predef$.assert(Predef.scala:179)
at 
org.apache.spark.deploy.master.Master$$anonfun$receive$1.applyOrElse(Master.scala:260)
at 
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at 
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

java.lang.IllegalStateException: Shutdown hooks cannot be modified during 
shutdown.
at 
org.apache.spark.util.SparkShutdownHookManager.add(ShutdownHookManager.scala:246)
at 
org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:191)
at 
org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:180)
at 
org.apache.spark.deploy.worker.ExecutorRunner.start(ExecutorRunner.scala:73)
at 
org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:474)
at 
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at 
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10269 from zsxwing/executor-state.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2aecda28
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2aecda28
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2aecda28

Branch: refs/heads/master
Commit: 2aecda284e22ec608992b6221e2f5ffbd51fcd24
Parents: 8af2f8c
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Sun Dec 13 22:06:39 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Sun Dec 13 22:06:39 2015 -0800

--
 .../main/scala/org/apache/spark/deploy/LocalSparkCluster.scala  | 2 ++
 core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 5 ++---
 .../scala/org/apache/spark/deploy/worker/ExecutorRunner.scala   | 5 +
 3 files changed, 9 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2aecda28/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala 
b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 83ccaad..5bb62d3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -75,6 +75,8 @@ class LocalSparkCluster(
 // Stop the workers before the master so they don't get upset that it 
disconnected
 workerRpcEnvs.foreach(_.shutdown())
 masterRpcEnvs.foreach(_.shutdown())
+workerRpcEnvs.foreach(_.awaitTermination())
+masterRpcEnvs.foreach(_.awaitTermination())
 masterRpcEnvs.clear()
 workerRpcEnvs.clear()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2aecda28/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 04b20e0..1355e1a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -257,9 +257,8 @@ private[deploy] class Master(
   exec.state = state
 
   if (state == 

spark git commit: [SPARK-12267][CORE] Store the remote RpcEnv address to send the correct disconnetion message

2015-12-12 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 98b212d36 -> 8af2f8c61


[SPARK-12267][CORE] Store the remote RpcEnv address to send the correct 
disconnetion message

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10261 from zsxwing/SPARK-12267.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8af2f8c6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8af2f8c6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8af2f8c6

Branch: refs/heads/master
Commit: 8af2f8c61ae4a59d129fb3530d0f6e9317f4bff8
Parents: 98b212d
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Sat Dec 12 21:58:55 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Sat Dec 12 21:58:55 2015 -0800

--
 .../spark/deploy/master/ApplicationInfo.scala   |  1 +
 .../org/apache/spark/deploy/worker/Worker.scala |  2 +-
 .../apache/spark/rpc/netty/NettyRpcEnv.scala| 21 ++
 .../org/apache/spark/rpc/RpcEnvSuite.scala  | 42 
 4 files changed, 65 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8af2f8c6/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index ac553b7..7e2cf95 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -66,6 +66,7 @@ private[spark] class ApplicationInfo(
 nextExecutorId = 0
 removedExecutors = new ArrayBuffer[ExecutorDesc]
 executorLimit = Integer.MAX_VALUE
+appUIUrlAtHistoryServer = None
   }
 
   private def newExecutorId(useID: Option[Int] = None): Int = {

http://git-wip-us.apache.org/repos/asf/spark/blob/8af2f8c6/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 1afc1ff..f41efb0 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -690,7 +690,7 @@ private[deploy] object Worker extends Logging {
 val conf = new SparkConf
 val args = new WorkerArguments(argStrings, conf)
 val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, 
args.cores,
-  args.memory, args.masters, args.workDir)
+  args.memory, args.masters, args.workDir, conf = conf)
 rpcEnv.awaitTermination()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8af2f8c6/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 68c5f44..f82fd4e 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -553,6 +553,9 @@ private[netty] class NettyRpcHandler(
   // A variable to track whether we should dispatch the RemoteProcessConnected 
message.
   private val clients = new ConcurrentHashMap[TransportClient, JBoolean]()
 
+  // A variable to track the remote RpcEnv addresses of all clients
+  private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]()
+
   override def receive(
   client: TransportClient,
   message: ByteBuffer,
@@ -580,6 +583,12 @@ private[netty] class NettyRpcHandler(
   // Create a new message with the socket address of the client as the 
sender.
   RequestMessage(clientAddr, requestMessage.receiver, 
requestMessage.content)
 } else {
+  // The remote RpcEnv listens to some port, we should also fire a 
RemoteProcessConnected for
+  // the listening address
+  val remoteEnvAddress = requestMessage.senderAddress
+  if (remoteAddresses.putIfAbsent(clientAddr, remoteEnvAddress) == null) {
+dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress))
+  }
   requestMessage
 }
   }
@@ -591,6 +600,12 @@ private[netty] class NettyRpcHandler(
 if (addr != null) {
   val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
   dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr))
+  // If the remove RpcEnv listens to some address, we should also fire a
+  // RemoteProcessConnectionError for the remote RpcEnv listening address
+ 

spark git commit: [SPARK-12267][CORE] Store the remote RpcEnv address to send the correct disconnetion message

2015-12-12 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e05364baa -> d7e3bfd7d


[SPARK-12267][CORE] Store the remote RpcEnv address to send the correct 
disconnetion message

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10261 from zsxwing/SPARK-12267.

(cherry picked from commit 8af2f8c61ae4a59d129fb3530d0f6e9317f4bff8)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7e3bfd7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7e3bfd7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7e3bfd7

Branch: refs/heads/branch-1.6
Commit: d7e3bfd7d33b8fba44ef80932c0d40fb68075cb4
Parents: e05364b
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Sat Dec 12 21:58:55 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Sat Dec 12 21:59:03 2015 -0800

--
 .../spark/deploy/master/ApplicationInfo.scala   |  1 +
 .../org/apache/spark/deploy/worker/Worker.scala |  2 +-
 .../apache/spark/rpc/netty/NettyRpcEnv.scala| 21 ++
 .../org/apache/spark/rpc/RpcEnvSuite.scala  | 42 
 4 files changed, 65 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d7e3bfd7/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index ac553b7..7e2cf95 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -66,6 +66,7 @@ private[spark] class ApplicationInfo(
 nextExecutorId = 0
 removedExecutors = new ArrayBuffer[ExecutorDesc]
 executorLimit = Integer.MAX_VALUE
+appUIUrlAtHistoryServer = None
   }
 
   private def newExecutorId(useID: Option[Int] = None): Int = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d7e3bfd7/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 1afc1ff..f41efb0 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -690,7 +690,7 @@ private[deploy] object Worker extends Logging {
 val conf = new SparkConf
 val args = new WorkerArguments(argStrings, conf)
 val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, 
args.cores,
-  args.memory, args.masters, args.workDir)
+  args.memory, args.masters, args.workDir, conf = conf)
 rpcEnv.awaitTermination()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d7e3bfd7/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index ed1f082..9d353bb 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -560,6 +560,9 @@ private[netty] class NettyRpcHandler(
   // A variable to track whether we should dispatch the RemoteProcessConnected 
message.
   private val clients = new ConcurrentHashMap[TransportClient, JBoolean]()
 
+  // A variable to track the remote RpcEnv addresses of all clients
+  private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]()
+
   override def receive(
   client: TransportClient,
   message: ByteBuffer,
@@ -587,6 +590,12 @@ private[netty] class NettyRpcHandler(
   // Create a new message with the socket address of the client as the 
sender.
   RequestMessage(clientAddr, requestMessage.receiver, 
requestMessage.content)
 } else {
+  // The remote RpcEnv listens to some port, we should also fire a 
RemoteProcessConnected for
+  // the listening address
+  val remoteEnvAddress = requestMessage.senderAddress
+  if (remoteAddresses.putIfAbsent(clientAddr, remoteEnvAddress) == null) {
+dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress))
+  }
   requestMessage
 }
   }
@@ -598,6 +607,12 @@ private[netty] class NettyRpcHandler(
 if (addr != null) {
   val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
   dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr))
+  // If the re

spark git commit: [SPARK-12273][STREAMING] Make Spark Streaming web UI list Receivers in order

2015-12-11 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master aa305dcaf -> 713e6959d


[SPARK-12273][STREAMING] Make Spark Streaming web UI list Receivers in order

Currently the Streaming web UI does NOT list Receivers in order; however, it 
seems more convenient for the users if Receivers are listed in order.

![spark-12273](https://cloud.githubusercontent.com/assets/15843379/11736602/0bb7f7a8-a00b-11e5-8e86-96ba9297fb12.png)

Author: proflin 

Closes #10264 from proflin/Spark-12273.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/713e6959
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/713e6959
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/713e6959

Branch: refs/heads/master
Commit: 713e6959d21d24382ef99bbd7e9da751a7ed388c
Parents: aa305dc
Author: proflin 
Authored: Fri Dec 11 13:50:36 2015 -0800
Committer: Shixiong Zhu 
Committed: Fri Dec 11 13:50:36 2015 -0800

--
 .../scala/org/apache/spark/streaming/ui/StreamingPage.scala | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/713e6959/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 4588b21..88a4483 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -392,8 +392,9 @@ private[ui] class StreamingPage(parent: StreamingTab)
   maxX: Long,
   minY: Double,
   maxY: Double): Seq[Node] = {
-val content = listener.receivedEventRateWithBatchTime.map { case 
(streamId, eventRates) =>
-  generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, 
minY, maxY)
+val content = 
listener.receivedEventRateWithBatchTime.toList.sortBy(_._1).map {
+  case (streamId, eventRates) =>
+generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, 
minY, maxY)
 }.foldLeft[Seq[Node]](Nil)(_ ++ _)
 
 // scalastyle:off


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc

2015-12-10 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 c7c99857d -> 43f02e41e


[STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc

With the merge of 
[SPARK-8337](https://issues.apache.org/jira/browse/SPARK-8337), now the Python 
API has the same functionalities compared to Scala/Java, so here changing the 
description to make it more precise.

zsxwing tdas , please review, thanks a lot.

Author: jerryshao <ss...@hortonworks.com>

Closes #10246 from jerryshao/direct-kafka-doc-update.

(cherry picked from commit 24d3357d66e14388faf8709b368edca70ea96432)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43f02e41
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43f02e41
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43f02e41

Branch: refs/heads/branch-1.4
Commit: 43f02e41e36256c2b90ee06d8b380ff2dfc7cabf
Parents: c7c9985
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Dec 10 15:31:46 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 10 15:32:16 2015 -0800

--
 docs/streaming-kafka-integration.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/43f02e41/docs/streaming-kafka-integration.md
--
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index 79b811c..100f637 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -74,7 +74,7 @@ Next, we discuss how to use this approach in your streaming 
application.
[Maven 
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
 
 ## Approach 2: Direct Approach (No Receivers)
-This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it 
is not yet at full feature parity.
+This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
 
 This approach has the following advantages over the receiver-based approach 
(i.e. Approach 1).
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc

2015-12-10 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 4b99f72f7 -> cb0246c93


[STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc

With the merge of 
[SPARK-8337](https://issues.apache.org/jira/browse/SPARK-8337), now the Python 
API has the same functionalities compared to Scala/Java, so here changing the 
description to make it more precise.

zsxwing tdas , please review, thanks a lot.

Author: jerryshao <ss...@hortonworks.com>

Closes #10246 from jerryshao/direct-kafka-doc-update.

(cherry picked from commit 24d3357d66e14388faf8709b368edca70ea96432)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb0246c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb0246c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb0246c9

Branch: refs/heads/branch-1.5
Commit: cb0246c9314892dbb3403488154b2d987c90b1dd
Parents: 4b99f72
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Dec 10 15:31:46 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 10 15:32:05 2015 -0800

--
 docs/streaming-kafka-integration.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cb0246c9/docs/streaming-kafka-integration.md
--
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index ab7f011..d58f4f6 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -74,7 +74,7 @@ Next, we discuss how to use this approach in your streaming 
application.
[Maven 
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
 
 ## Approach 2: Direct Approach (No Receivers)
-This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it 
is not yet at full feature parity.
+This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
 
 This approach has the following advantages over the receiver-based approach 
(i.e. Approach 1).
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc

2015-12-10 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 5030923ea -> 24d3357d6


[STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc

With the merge of 
[SPARK-8337](https://issues.apache.org/jira/browse/SPARK-8337), now the Python 
API has the same functionalities compared to Scala/Java, so here changing the 
description to make it more precise.

zsxwing tdas , please review, thanks a lot.

Author: jerryshao <ss...@hortonworks.com>

Closes #10246 from jerryshao/direct-kafka-doc-update.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24d3357d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24d3357d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24d3357d

Branch: refs/heads/master
Commit: 24d3357d66e14388faf8709b368edca70ea96432
Parents: 5030923
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Dec 10 15:31:46 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 10 15:31:46 2015 -0800

--
 docs/streaming-kafka-integration.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/24d3357d/docs/streaming-kafka-integration.md
--
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index b00351b..5be73c4 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -74,7 +74,7 @@ Next, we discuss how to use this approach in your streaming 
application.
[Maven 
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
 
 ## Approach 2: Direct Approach (No Receivers)
-This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it 
is not yet at full feature parity.
+This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
 
 This approach has the following advantages over the receiver-based approach 
(i.e. Approach 1).
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[1/2] spark git commit: [SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState and change tracking function signature

2015-12-09 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 699f497cf -> f6d866173


http://git-wip-us.apache.org/repos/asf/spark/blob/f6d86617/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
--
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
 
b/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
deleted file mode 100644
index eac4cdd..000
--- 
a/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import scala.Tuple2;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.util.ManualClock;
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.spark.HashPartitioner;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.Function4;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaTrackStateDStream;
-
-public class JavaTrackStateByKeySuite extends LocalJavaStreamingContext 
implements Serializable {
-
-  /**
-   * This test is only for testing the APIs. It's not necessary to run it.
-   */
-  public void testAPI() {
-JavaPairRDD initialRDD = null;
-JavaPairDStream wordsDstream = null;
-
-final Function4
-trackStateFunc =
-new Function4() {
-
-  @Override
-  public Optional call(
-  Time time, String word, Optional one, State 
state) {
-// Use all State's methods here
-state.exists();
-state.get();
-state.isTimingOut();
-state.remove();
-state.update(true);
-return Optional.of(2.0);
-  }
-};
-
-JavaTrackStateDStream stateDstream =
-wordsDstream.trackStateByKey(
-StateSpec.function(trackStateFunc)
-.initialState(initialRDD)
-.numPartitions(10)
-.partitioner(new HashPartitioner(10))
-.timeout(Durations.seconds(10)));
-
-JavaPairDStream emittedRecords = 
stateDstream.stateSnapshots();
-
-final Function2 trackStateFunc2 
=
-new Function2() {
-
-  @Override
-  public Double call(Optional one, State state) {
-// Use all State's methods here
-state.exists();
-state.get();
-state.isTimingOut();
-state.remove();
-state.update(true);
-return 2.0;
-  }
-};
-
-JavaTrackStateDStream stateDstream2 =
-wordsDstream.trackStateByKey(
-StateSpec. 
function(trackStateFunc2)
-.initialState(initialRDD)
-.numPartitions(10)
-.partitioner(new HashPartitioner(10))
-.timeout(Durations.seconds(10)));
-
-JavaPairDStream emittedRecords2 = 
stateDstream2.stateSnapshots();
-  }
-
-  @Test
-  public void testBasicFunction() {
-List inputData = Arrays.asList(
-Collections.emptyList(),
-Arrays.asList("a"),
-Arrays.asList("a", "b"),
-Arrays.asList("a", "b", "c"),
-Arrays.asList("a", "b"),
-Arrays.asList("a"),
-Collections.emptyList()
-);
-
-List outputData = Arrays.asList(
-

[2/2] spark git commit: [SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState and change tracking function signature

2015-12-09 Thread zsxwing
[SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState 
and change tracking function signature

SPARK-12244:

Based on feedback from early users and personal experience attempting to 
explain it, the name trackStateByKey had two problem.
"trackState" is a completely new term which really does not give any intuition 
on what the operation is
the resultant data stream of objects returned by the function is called in docs 
as the "emitted" data for the lack of a better.
"mapWithState" makes sense because the API is like a mapping function like 
(Key, Value) => T with State as an additional parameter. The resultant data 
stream is "mapped data". So both problems are solved.

SPARK-12245:

>From initial experiences, not having the key in the function makes it hard to 
>return mapped stuff, as the whole information of the records is not there. 
>Basically the user is restricted to doing something like mapValue() instead of 
>map(). So adding the key as a parameter.

Author: Tathagata Das 

Closes #10224 from tdas/rename.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6d86617
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6d86617
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6d86617

Branch: refs/heads/branch-1.6
Commit: f6d8661738b5a4b139c4800d5c4e9f0094068451
Parents: 699f497
Author: Tathagata Das 
Authored: Wed Dec 9 20:47:15 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 9 20:59:21 2015 -0800

--
 .../streaming/JavaStatefulNetworkWordCount.java |  16 +-
 .../streaming/StatefulNetworkWordCount.scala|  12 +-
 .../apache/spark/streaming/Java8APISuite.java   |  18 +-
 .../org/apache/spark/streaming/State.scala  |  20 +-
 .../org/apache/spark/streaming/StateSpec.scala  | 160 ++---
 .../api/java/JavaMapWithStateDStream.scala  |  44 ++
 .../streaming/api/java/JavaPairDStream.scala|  50 +-
 .../api/java/JavaTrackStateDStream.scala|  44 --
 .../streaming/dstream/MapWithStateDStream.scala | 170 ++
 .../dstream/PairDStreamFunctions.scala  |  41 +-
 .../streaming/dstream/TrackStateDStream.scala   | 171 --
 .../spark/streaming/rdd/MapWithStateRDD.scala   | 223 +++
 .../spark/streaming/rdd/TrackStateRDD.scala | 228 
 .../spark/streaming/JavaMapWithStateSuite.java  | 210 +++
 .../streaming/JavaTrackStateByKeySuite.java | 210 ---
 .../spark/streaming/MapWithStateSuite.scala | 581 +++
 .../spark/streaming/TrackStateByKeySuite.scala  | 581 ---
 .../streaming/rdd/MapWithStateRDDSuite.scala| 389 +
 .../streaming/rdd/TrackStateRDDSuite.scala  | 389 -
 19 files changed, 1782 insertions(+), 1775 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f6d86617/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index c400e42..14997c6 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -65,7 +65,7 @@ public class JavaStatefulNetworkWordCount {
 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1));
 ssc.checkpoint(".");
 
-// Initial RDD input to trackStateByKey
+// Initial state RDD input to mapWithState
 @SuppressWarnings("unchecked")
 List> tuples = Arrays.asList(new Tuple2("hello", 1),
 new Tuple2("world", 1));
@@ -90,21 +90,21 @@ public class JavaStatefulNetworkWordCount {
 });
 
 // Update the cumulative count function
-final Function4>> trackStateFunc =
-new Function4>>() {
+final Function3> mappingFunc =
+new Function3>() {
 
   @Override
-  public Optional> call(Time time, String 
word, Optional one, State state) {
+  public Tuple2 call(String word, Optional 
one, State state) {
 int sum = one.or(0) + (state.exists() ? state.get() : 0);
 Tuple2 output = new Tuple2(word, 

[2/2] spark git commit: [SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState and change tracking function signature

2015-12-09 Thread zsxwing
[SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState 
and change tracking function signature

SPARK-12244:

Based on feedback from early users and personal experience attempting to 
explain it, the name trackStateByKey had two problem.
"trackState" is a completely new term which really does not give any intuition 
on what the operation is
the resultant data stream of objects returned by the function is called in docs 
as the "emitted" data for the lack of a better.
"mapWithState" makes sense because the API is like a mapping function like 
(Key, Value) => T with State as an additional parameter. The resultant data 
stream is "mapped data". So both problems are solved.

SPARK-12245:

>From initial experiences, not having the key in the function makes it hard to 
>return mapped stuff, as the whole information of the records is not there. 
>Basically the user is restricted to doing something like mapValue() instead of 
>map(). So adding the key as a parameter.

Author: Tathagata Das 

Closes #10224 from tdas/rename.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd2cd4f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd2cd4f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd2cd4f5

Branch: refs/heads/master
Commit: bd2cd4f53d1ca10f4896bd39b0e180d4929867a2
Parents: 2166c2a
Author: Tathagata Das 
Authored: Wed Dec 9 20:47:15 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 9 20:47:15 2015 -0800

--
 .../streaming/JavaStatefulNetworkWordCount.java |  16 +-
 .../streaming/StatefulNetworkWordCount.scala|  12 +-
 .../apache/spark/streaming/Java8APISuite.java   |  18 +-
 .../org/apache/spark/streaming/State.scala  |  20 +-
 .../org/apache/spark/streaming/StateSpec.scala  | 160 ++---
 .../api/java/JavaMapWithStateDStream.scala  |  44 ++
 .../streaming/api/java/JavaPairDStream.scala|  50 +-
 .../api/java/JavaTrackStateDStream.scala|  44 --
 .../streaming/dstream/MapWithStateDStream.scala | 170 ++
 .../dstream/PairDStreamFunctions.scala  |  41 +-
 .../streaming/dstream/TrackStateDStream.scala   | 171 --
 .../spark/streaming/rdd/MapWithStateRDD.scala   | 223 +++
 .../spark/streaming/rdd/TrackStateRDD.scala | 228 
 .../spark/streaming/JavaMapWithStateSuite.java  | 210 +++
 .../streaming/JavaTrackStateByKeySuite.java | 210 ---
 .../spark/streaming/MapWithStateSuite.scala | 581 +++
 .../spark/streaming/TrackStateByKeySuite.scala  | 581 ---
 .../streaming/rdd/MapWithStateRDDSuite.scala| 389 +
 .../streaming/rdd/TrackStateRDDSuite.scala  | 389 -
 19 files changed, 1782 insertions(+), 1775 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bd2cd4f5/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index c400e42..14997c6 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -65,7 +65,7 @@ public class JavaStatefulNetworkWordCount {
 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1));
 ssc.checkpoint(".");
 
-// Initial RDD input to trackStateByKey
+// Initial state RDD input to mapWithState
 @SuppressWarnings("unchecked")
 List> tuples = Arrays.asList(new Tuple2("hello", 1),
 new Tuple2("world", 1));
@@ -90,21 +90,21 @@ public class JavaStatefulNetworkWordCount {
 });
 
 // Update the cumulative count function
-final Function4>> trackStateFunc =
-new Function4>>() {
+final Function3> mappingFunc =
+new Function3>() {
 
   @Override
-  public Optional> call(Time time, String 
word, Optional one, State state) {
+  public Tuple2 call(String word, Optional 
one, State state) {
 int sum = one.or(0) + (state.exists() ? state.get() : 0);
 Tuple2 output = new Tuple2(word, 

[1/2] spark git commit: [SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState and change tracking function signature

2015-12-09 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 2166c2a75 -> bd2cd4f53


http://git-wip-us.apache.org/repos/asf/spark/blob/bd2cd4f5/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
--
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
 
b/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
deleted file mode 100644
index 89d0bb7..000
--- 
a/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import scala.Tuple2;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.util.ManualClock;
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.spark.HashPartitioner;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.Function4;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaTrackStateDStream;
-
-public class JavaTrackStateByKeySuite extends LocalJavaStreamingContext 
implements Serializable {
-
-  /**
-   * This test is only for testing the APIs. It's not necessary to run it.
-   */
-  public void testAPI() {
-JavaPairRDD initialRDD = null;
-JavaPairDStream wordsDstream = null;
-
-final Function4
-trackStateFunc =
-new Function4() {
-
-  @Override
-  public Optional call(
-  Time time, String word, Optional one, State 
state) {
-// Use all State's methods here
-state.exists();
-state.get();
-state.isTimingOut();
-state.remove();
-state.update(true);
-return Optional.of(2.0);
-  }
-};
-
-JavaTrackStateDStream stateDstream =
-wordsDstream.trackStateByKey(
-StateSpec.function(trackStateFunc)
-.initialState(initialRDD)
-.numPartitions(10)
-.partitioner(new HashPartitioner(10))
-.timeout(Durations.seconds(10)));
-
-JavaPairDStream emittedRecords = 
stateDstream.stateSnapshots();
-
-final Function2 trackStateFunc2 
=
-new Function2() {
-
-  @Override
-  public Double call(Optional one, State state) {
-// Use all State's methods here
-state.exists();
-state.get();
-state.isTimingOut();
-state.remove();
-state.update(true);
-return 2.0;
-  }
-};
-
-JavaTrackStateDStream stateDstream2 =
-wordsDstream.trackStateByKey(
-StateSpec.function(trackStateFunc2)
-.initialState(initialRDD)
-.numPartitions(10)
-.partitioner(new HashPartitioner(10))
-.timeout(Durations.seconds(10)));
-
-JavaPairDStream emittedRecords2 = 
stateDstream2.stateSnapshots();
-  }
-
-  @Test
-  public void testBasicFunction() {
-List inputData = Arrays.asList(
-Collections.emptyList(),
-Arrays.asList("a"),
-Arrays.asList("a", "b"),
-Arrays.asList("a", "b", "c"),
-Arrays.asList("a", "b"),
-Arrays.asList("a"),
-Collections.emptyList()
-);
-
-List outputData = Arrays.asList(
-

spark git commit: [SPARK-12074] Avoid memory copy involving ByteBuffer.wrap(ByteArrayOutputStream.toByteArray)

2015-12-08 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 6cb06e871 -> 75c60bf4b


[SPARK-12074] Avoid memory copy involving 
ByteBuffer.wrap(ByteArrayOutputStream.toByteArray)

SPARK-12060 fixed JavaSerializerInstance.serialize
This PR applies the same technique on two other classes.

zsxwing

Author: tedyu <yuzhih...@gmail.com>

Closes #10177 from tedyu/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75c60bf4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75c60bf4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75c60bf4

Branch: refs/heads/master
Commit: 75c60bf4ba91e45e76a6e27f054a1c550eb6ff94
Parents: 6cb06e8
Author: tedyu <yuzhih...@gmail.com>
Authored: Tue Dec 8 10:01:44 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Dec 8 10:01:44 2015 -0800

--
 core/src/main/scala/org/apache/spark/scheduler/Task.scala | 7 +++
 .../main/scala/org/apache/spark/storage/BlockManager.scala| 4 ++--
 .../scala/org/apache/spark/util/ByteBufferOutputStream.scala  | 4 +++-
 3 files changed, 8 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/75c60bf4/core/src/main/scala/org/apache/spark/scheduler/Task.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 5fe5ae8..d4bc3a5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -27,8 +27,7 @@ import org.apache.spark.{Accumulator, SparkEnv, 
TaskContextImpl, TaskContext}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.ByteBufferInputStream
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, 
Utils}
 
 
 /**
@@ -172,7 +171,7 @@ private[spark] object Task {
   serializer: SerializerInstance)
 : ByteBuffer = {
 
-val out = new ByteArrayOutputStream(4096)
+val out = new ByteBufferOutputStream(4096)
 val dataOut = new DataOutputStream(out)
 
 // Write currentFiles
@@ -193,7 +192,7 @@ private[spark] object Task {
 dataOut.flush()
 val taskBytes = serializer.serialize(task)
 Utils.writeByteBuffer(taskBytes, out)
-ByteBuffer.wrap(out.toByteArray)
+out.toByteBuffer
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/75c60bf4/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ab0007f..ed05143 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1202,9 +1202,9 @@ private[spark] class BlockManager(
   blockId: BlockId,
   values: Iterator[Any],
   serializer: Serializer = defaultSerializer): ByteBuffer = {
-val byteStream = new ByteArrayOutputStream(4096)
+val byteStream = new ByteBufferOutputStream(4096)
 dataSerializeStream(blockId, byteStream, values, serializer)
-ByteBuffer.wrap(byteStream.toByteArray)
+byteStream.toByteBuffer
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/75c60bf4/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala 
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
index 92e4522..8527e3a 100644
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
@@ -23,7 +23,9 @@ import java.nio.ByteBuffer
 /**
  * Provide a zero-copy way to convert data in ByteArrayOutputStream to 
ByteBuffer
  */
-private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
+private[spark] class ByteBufferOutputStream(capacity: Int) extends 
ByteArrayOutputStream(capacity) {
+
+  def this() = this(32)
 
   def toByteBuffer: ByteBuffer = {
 return ByteBuffer.wrap(buf, 0, count)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize

2015-12-07 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 5d80d8c6a -> 3f4efb5c2


[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize

Merged #10051 again since #10083 is resolved.

This reverts commit 328b757d5d4486ea3c2e246780792d7a57ee85e5.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10167 from zsxwing/merge-SPARK-12060.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f4efb5c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f4efb5c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f4efb5c

Branch: refs/heads/master
Commit: 3f4efb5c23b029496b112760fa062ff070c20334
Parents: 5d80d8c
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Mon Dec 7 12:01:09 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Mon Dec 7 12:01:09 2015 -0800

--
 .../spark/serializer/JavaSerializer.scala   |  7 ++---
 .../spark/util/ByteBufferOutputStream.scala | 31 
 2 files changed, 34 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3f4efb5c/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index b463a71..ea718a0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,8 +24,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.ByteBufferInputStream
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, 
Utils}
 
 private[spark] class JavaSerializationStream(
 out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
@@ -96,11 +95,11 @@ private[spark] class JavaSerializerInstance(
   extends SerializerInstance {
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
-val bos = new ByteArrayOutputStream()
+val bos = new ByteBufferOutputStream()
 val out = serializeStream(bos)
 out.writeObject(t)
 out.close()
-ByteBuffer.wrap(bos.toByteArray)
+bos.toByteBuffer
   }
 
   override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {

http://git-wip-us.apache.org/repos/asf/spark/blob/3f4efb5c/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala 
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
new file mode 100644
index 000..92e4522
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+
+/**
+ * Provide a zero-copy way to convert data in ByteArrayOutputStream to 
ByteBuffer
+ */
+private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
+
+  def toByteBuffer: ByteBuffer = {
+return ByteBuffer.wrap(buf, 0, count)
+  }
+}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and AppClient (backport 1.5)

2015-12-07 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 93a0510a5 -> 3868ab644


[SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and 
AppClient (backport 1.5)

backport #10108 to branch 1.5

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10135 from zsxwing/fix-threadpool-1.5.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3868ab64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3868ab64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3868ab64

Branch: refs/heads/branch-1.5
Commit: 3868ab644cc87ce68ba1605f6da65c5e951ce412
Parents: 93a0510
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Mon Dec 7 12:04:18 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Mon Dec 7 12:04:18 2015 -0800

--
 .../apache/spark/deploy/client/AppClient.scala   | 10 --
 .../org/apache/spark/deploy/worker/Worker.scala  | 10 --
 .../apache/spark/deploy/yarn/YarnAllocator.scala | 19 +--
 3 files changed, 13 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 25ea692..bd28429 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -66,12 +66,10 @@ private[spark] class AppClient(
 // A thread pool for registering with masters. Because registering with a 
master is a blocking
 // action, this thread pool must be able to create 
"masterRpcAddresses.size" threads at the same
 // time so that we can register with all masters.
-private val registerMasterThreadPool = new ThreadPoolExecutor(
-  0,
-  masterRpcAddresses.size, // Make sure we can register with all masters 
at the same time
-  60L, TimeUnit.SECONDS,
-  new SynchronousQueue[Runnable](),
-  ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
+private val registerMasterThreadPool = 
ThreadUtils.newDaemonCachedThreadPool(
+  "appclient-register-master-threadpool",
+  masterRpcAddresses.length // Make sure we can register with all masters 
at the same time
+)
 
 // A scheduled executor for scheduling the registration actions
 private val registrationRetryThread =

http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 79b1536..a898bb1 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -147,12 +147,10 @@ private[deploy] class Worker(
   // A thread pool for registering with masters. Because registering with a 
master is a blocking
   // action, this thread pool must be able to create "masterRpcAddresses.size" 
threads at the same
   // time so that we can register with all masters.
-  private val registerMasterThreadPool = new ThreadPoolExecutor(
-0,
-masterRpcAddresses.size, // Make sure we can register with all masters at 
the same time
-60L, TimeUnit.SECONDS,
-new SynchronousQueue[Runnable](),
-ThreadUtils.namedThreadFactory("worker-register-master-threadpool"))
+  private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+"worker-register-master-threadpool",
+masterRpcAddresses.size // Make sure we can register with all masters at 
the same time
+  )
 
   var coresUsed = 0
   var memoryUsed = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 6a02848..52a3fd9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -21,26 +21,21 @@ import java.util.Collections
 import java.util.concurrent._
 import java.util.regex.Pattern
 
-import org.apache.spark.util.Utils
-
 import scala.collection.JavaConvers

spark git commit: [SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and AppClient

2015-12-03 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 8865d87f7 -> 4c84f6e91


[SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and 
AppClient

`SynchronousQueue` cannot cache any task. This issue is similar to #9978. It's 
an easy fix. Just use the fixed `ThreadUtils.newDaemonCachedThreadPool`.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10108 from zsxwing/fix-threadpool.

(cherry picked from commit 649be4fa4532dcd3001df8345f9f7e970a3fbc65)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c84f6e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c84f6e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c84f6e9

Branch: refs/heads/branch-1.6
Commit: 4c84f6e91d61a358c179b04bf6d1bc8b9559b6d0
Parents: 8865d87
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Dec 3 11:06:25 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 3 11:06:46 2015 -0800

--
 .../org/apache/spark/deploy/client/AppClient.scala| 10 --
 .../scala/org/apache/spark/deploy/worker/Worker.scala | 10 --
 .../org/apache/spark/deploy/yarn/YarnAllocator.scala  | 14 --
 3 files changed, 12 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4c84f6e9/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index df6ba7d..1e2f469 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -68,12 +68,10 @@ private[spark] class AppClient(
 // A thread pool for registering with masters. Because registering with a 
master is a blocking
 // action, this thread pool must be able to create 
"masterRpcAddresses.size" threads at the same
 // time so that we can register with all masters.
-private val registerMasterThreadPool = new ThreadPoolExecutor(
-  0,
-  masterRpcAddresses.length, // Make sure we can register with all masters 
at the same time
-  60L, TimeUnit.SECONDS,
-  new SynchronousQueue[Runnable](),
-  ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
+private val registerMasterThreadPool = 
ThreadUtils.newDaemonCachedThreadPool(
+  "appclient-register-master-threadpool",
+  masterRpcAddresses.length // Make sure we can register with all masters 
at the same time
+)
 
 // A scheduled executor for scheduling the registration actions
 private val registrationRetryThread =

http://git-wip-us.apache.org/repos/asf/spark/blob/4c84f6e9/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 418faf8..1afc1ff 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -146,12 +146,10 @@ private[deploy] class Worker(
   // A thread pool for registering with masters. Because registering with a 
master is a blocking
   // action, this thread pool must be able to create "masterRpcAddresses.size" 
threads at the same
   // time so that we can register with all masters.
-  private val registerMasterThreadPool = new ThreadPoolExecutor(
-0,
-masterRpcAddresses.size, // Make sure we can register with all masters at 
the same time
-60L, TimeUnit.SECONDS,
-new SynchronousQueue[Runnable](),
-ThreadUtils.namedThreadFactory("worker-register-master-threadpool"))
+  private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+"worker-register-master-threadpool",
+masterRpcAddresses.size // Make sure we can register with all masters at 
the same time
+  )
 
   var coresUsed = 0
   var memoryUsed = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/4c84f6e9/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 73cd903..4e044aa 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/de

spark git commit: [SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and AppClient

2015-12-03 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 7bc9e1db2 -> 649be4fa4


[SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and 
AppClient

`SynchronousQueue` cannot cache any task. This issue is similar to #9978. It's 
an easy fix. Just use the fixed `ThreadUtils.newDaemonCachedThreadPool`.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10108 from zsxwing/fix-threadpool.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/649be4fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/649be4fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/649be4fa

Branch: refs/heads/master
Commit: 649be4fa4532dcd3001df8345f9f7e970a3fbc65
Parents: 7bc9e1d
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Dec 3 11:06:25 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 3 11:06:25 2015 -0800

--
 .../org/apache/spark/deploy/client/AppClient.scala| 10 --
 .../scala/org/apache/spark/deploy/worker/Worker.scala | 10 --
 .../org/apache/spark/deploy/yarn/YarnAllocator.scala  | 14 --
 3 files changed, 12 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/649be4fa/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index df6ba7d..1e2f469 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -68,12 +68,10 @@ private[spark] class AppClient(
 // A thread pool for registering with masters. Because registering with a 
master is a blocking
 // action, this thread pool must be able to create 
"masterRpcAddresses.size" threads at the same
 // time so that we can register with all masters.
-private val registerMasterThreadPool = new ThreadPoolExecutor(
-  0,
-  masterRpcAddresses.length, // Make sure we can register with all masters 
at the same time
-  60L, TimeUnit.SECONDS,
-  new SynchronousQueue[Runnable](),
-  ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
+private val registerMasterThreadPool = 
ThreadUtils.newDaemonCachedThreadPool(
+  "appclient-register-master-threadpool",
+  masterRpcAddresses.length // Make sure we can register with all masters 
at the same time
+)
 
 // A scheduled executor for scheduling the registration actions
 private val registrationRetryThread =

http://git-wip-us.apache.org/repos/asf/spark/blob/649be4fa/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 418faf8..1afc1ff 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -146,12 +146,10 @@ private[deploy] class Worker(
   // A thread pool for registering with masters. Because registering with a 
master is a blocking
   // action, this thread pool must be able to create "masterRpcAddresses.size" 
threads at the same
   // time so that we can register with all masters.
-  private val registerMasterThreadPool = new ThreadPoolExecutor(
-0,
-masterRpcAddresses.size, // Make sure we can register with all masters at 
the same time
-60L, TimeUnit.SECONDS,
-new SynchronousQueue[Runnable](),
-ThreadUtils.namedThreadFactory("worker-register-master-threadpool"))
+  private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+"worker-register-master-threadpool",
+masterRpcAddresses.size // Make sure we can register with all masters at 
the same time
+  )
 
   var coresUsed = 0
   var memoryUsed = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/649be4fa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 73cd903..4e044aa 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -25,8 +25,6 @@ import scala.collection.mutable
 import scala.collection.mutable.{A

spark git commit: Revert "[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize"

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 81db8d086 -> 21909b8ac


Revert "[SPARK-12060][CORE] Avoid memory copy in 
JavaSerializerInstance.serialize"

This reverts commit 9b99b2b46c452ba396e922db5fc7eec02c45b158.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21909b8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21909b8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21909b8a

Branch: refs/heads/branch-1.6
Commit: 21909b8ac0068658cc833f324c0f1f418c200d61
Parents: 81db8d0
Author: Shixiong Zhu 
Authored: Tue Dec 1 15:16:07 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 15:16:07 2015 -0800

--
 .../spark/serializer/JavaSerializer.scala   |  7 +++--
 .../spark/util/ByteBufferOutputStream.scala | 31 
 2 files changed, 4 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/21909b8a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index ea718a0..b463a71 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,7 +24,8 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, 
Utils}
+import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.util.Utils
 
 private[spark] class JavaSerializationStream(
 out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
@@ -95,11 +96,11 @@ private[spark] class JavaSerializerInstance(
   extends SerializerInstance {
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
-val bos = new ByteBufferOutputStream()
+val bos = new ByteArrayOutputStream()
 val out = serializeStream(bos)
 out.writeObject(t)
 out.close()
-bos.toByteBuffer
+ByteBuffer.wrap(bos.toByteArray)
   }
 
   override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {

http://git-wip-us.apache.org/repos/asf/spark/blob/21909b8a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala 
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
deleted file mode 100644
index 92e4522..000
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.io.ByteArrayOutputStream
-import java.nio.ByteBuffer
-
-/**
- * Provide a zero-copy way to convert data in ByteArrayOutputStream to 
ByteBuffer
- */
-private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
-
-  def toByteBuffer: ByteBuffer = {
-return ByteBuffer.wrap(buf, 0, count)
-  }
-}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Revert "[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize"

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 60b541ee1 -> 328b757d5


Revert "[SPARK-12060][CORE] Avoid memory copy in 
JavaSerializerInstance.serialize"

This reverts commit 1401166576c7018c5f9c31e0a6703d5fb16ea339.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/328b757d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/328b757d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/328b757d

Branch: refs/heads/master
Commit: 328b757d5d4486ea3c2e246780792d7a57ee85e5
Parents: 60b541e
Author: Shixiong Zhu 
Authored: Tue Dec 1 15:13:10 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 15:13:10 2015 -0800

--
 .../spark/serializer/JavaSerializer.scala   |  7 +++--
 .../spark/util/ByteBufferOutputStream.scala | 31 
 2 files changed, 4 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/328b757d/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index ea718a0..b463a71 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,7 +24,8 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, 
Utils}
+import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.util.Utils
 
 private[spark] class JavaSerializationStream(
 out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
@@ -95,11 +96,11 @@ private[spark] class JavaSerializerInstance(
   extends SerializerInstance {
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
-val bos = new ByteBufferOutputStream()
+val bos = new ByteArrayOutputStream()
 val out = serializeStream(bos)
 out.writeObject(t)
 out.close()
-bos.toByteBuffer
+ByteBuffer.wrap(bos.toByteArray)
   }
 
   override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {

http://git-wip-us.apache.org/repos/asf/spark/blob/328b757d/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala 
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
deleted file mode 100644
index 92e4522..000
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.io.ByteArrayOutputStream
-import java.nio.ByteBuffer
-
-/**
- * Provide a zero-copy way to convert data in ByteArrayOutputStream to 
ByteBuffer
- */
-private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
-
-  def toByteBuffer: ByteBuffer = {
-return ByteBuffer.wrap(buf, 0, count)
-  }
-}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12002][STREAMING][PYSPARK] Fix python direct stream checkpoint recovery issue

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 5647774b0 -> 012de2ce5


[SPARK-12002][STREAMING][PYSPARK] Fix python direct stream checkpoint recovery 
issue

Fixed a minor race condition in #10017

Closes #10017

Author: jerryshao <ss...@hortonworks.com>
Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10074 from zsxwing/review-pr10017.

(cherry picked from commit f292018f8e57779debc04998456ec875f628133b)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/012de2ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/012de2ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/012de2ce

Branch: refs/heads/branch-1.6
Commit: 012de2ce5de01bc57197fa26334fc175c8f20233
Parents: 5647774
Author: jerryshao <ss...@hortonworks.com>
Authored: Tue Dec 1 15:26:10 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Dec 1 15:26:20 2015 -0800

--
 python/pyspark/streaming/tests.py | 49 ++
 python/pyspark/streaming/util.py  | 13 -
 2 files changed, 56 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/012de2ce/python/pyspark/streaming/tests.py
--
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index d380d69..a2bfd79 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1150,6 +1150,55 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)
 
 @unittest.skipIf(sys.version >= "3", "long type not support")
+def test_kafka_direct_stream_transform_with_checkpoint(self):
+"""Test the Python direct Kafka stream transform with checkpoint 
correctly recovered."""
+topic = self._randomTopic()
+sendData = {"a": 1, "b": 2, "c": 3}
+kafkaParams = {"metadata.broker.list": 
self._kafkaTestUtils.brokerAddress(),
+   "auto.offset.reset": "smallest"}
+
+self._kafkaTestUtils.createTopic(topic)
+self._kafkaTestUtils.sendMessages(topic, sendData)
+
+offsetRanges = []
+
+def transformWithOffsetRanges(rdd):
+for o in rdd.offsetRanges():
+offsetRanges.append(o)
+return rdd
+
+self.ssc.stop(False)
+self.ssc = None
+tmpdir = "checkpoint-test-%d" % random.randint(0, 1)
+
+def setup():
+ssc = StreamingContext(self.sc, 0.5)
+ssc.checkpoint(tmpdir)
+stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
+stream.transform(transformWithOffsetRanges).count().pprint()
+return ssc
+
+try:
+ssc1 = StreamingContext.getOrCreate(tmpdir, setup)
+ssc1.start()
+self.wait_for(offsetRanges, 1)
+self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), 
long(6))])
+
+# To make sure some checkpoint is written
+time.sleep(3)
+ssc1.stop(False)
+ssc1 = None
+
+# Restart again to make sure the checkpoint is recovered correctly
+ssc2 = StreamingContext.getOrCreate(tmpdir, setup)
+ssc2.start()
+ssc2.awaitTermination(3)
+ssc2.stop(stopSparkContext=False, stopGraceFully=True)
+ssc2 = None
+finally:
+shutil.rmtree(tmpdir)
+
+@unittest.skipIf(sys.version >= "3", "long type not support")
 def test_kafka_rdd_message_handler(self):
 """Test Python direct Kafka RDD MessageHandler."""
 topic = self._randomTopic()

http://git-wip-us.apache.org/repos/asf/spark/blob/012de2ce/python/pyspark/streaming/util.py
--
diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py
index c7f02bc..abbbf6e 100644
--- a/python/pyspark/streaming/util.py
+++ b/python/pyspark/streaming/util.py
@@ -37,11 +37,11 @@ class TransformFunction(object):
 self.ctx = ctx
 self.func = func
 self.deserializers = deserializers
-self._rdd_wrapper = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
+self.rdd_wrap_func = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
 self.failure = None
 
 def rdd_wrapper(self, func):
-self._rdd_wrapper = func
+self.rdd_wrap_func = func
 return self
 
 def call(self, milliseconds

spark git commit: [SPARK-12002][STREAMING][PYSPARK] Fix python direct stream checkpoint recovery issue

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master e76431f88 -> f292018f8


[SPARK-12002][STREAMING][PYSPARK] Fix python direct stream checkpoint recovery 
issue

Fixed a minor race condition in #10017

Closes #10017

Author: jerryshao <ss...@hortonworks.com>
Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10074 from zsxwing/review-pr10017.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f292018f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f292018f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f292018f

Branch: refs/heads/master
Commit: f292018f8e57779debc04998456ec875f628133b
Parents: e76431f
Author: jerryshao <ss...@hortonworks.com>
Authored: Tue Dec 1 15:26:10 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Dec 1 15:26:10 2015 -0800

--
 python/pyspark/streaming/tests.py | 49 ++
 python/pyspark/streaming/util.py  | 13 -
 2 files changed, 56 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f292018f/python/pyspark/streaming/tests.py
--
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index a647e6b..d50c6b8 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1150,6 +1150,55 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)
 
 @unittest.skipIf(sys.version >= "3", "long type not support")
+def test_kafka_direct_stream_transform_with_checkpoint(self):
+"""Test the Python direct Kafka stream transform with checkpoint 
correctly recovered."""
+topic = self._randomTopic()
+sendData = {"a": 1, "b": 2, "c": 3}
+kafkaParams = {"metadata.broker.list": 
self._kafkaTestUtils.brokerAddress(),
+   "auto.offset.reset": "smallest"}
+
+self._kafkaTestUtils.createTopic(topic)
+self._kafkaTestUtils.sendMessages(topic, sendData)
+
+offsetRanges = []
+
+def transformWithOffsetRanges(rdd):
+for o in rdd.offsetRanges():
+offsetRanges.append(o)
+return rdd
+
+self.ssc.stop(False)
+self.ssc = None
+tmpdir = "checkpoint-test-%d" % random.randint(0, 1)
+
+def setup():
+ssc = StreamingContext(self.sc, 0.5)
+ssc.checkpoint(tmpdir)
+stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
+stream.transform(transformWithOffsetRanges).count().pprint()
+return ssc
+
+try:
+ssc1 = StreamingContext.getOrCreate(tmpdir, setup)
+ssc1.start()
+self.wait_for(offsetRanges, 1)
+self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), 
long(6))])
+
+# To make sure some checkpoint is written
+time.sleep(3)
+ssc1.stop(False)
+ssc1 = None
+
+# Restart again to make sure the checkpoint is recovered correctly
+ssc2 = StreamingContext.getOrCreate(tmpdir, setup)
+ssc2.start()
+ssc2.awaitTermination(3)
+ssc2.stop(stopSparkContext=False, stopGraceFully=True)
+ssc2 = None
+finally:
+shutil.rmtree(tmpdir)
+
+@unittest.skipIf(sys.version >= "3", "long type not support")
 def test_kafka_rdd_message_handler(self):
 """Test Python direct Kafka RDD MessageHandler."""
 topic = self._randomTopic()

http://git-wip-us.apache.org/repos/asf/spark/blob/f292018f/python/pyspark/streaming/util.py
--
diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py
index c7f02bc..abbbf6e 100644
--- a/python/pyspark/streaming/util.py
+++ b/python/pyspark/streaming/util.py
@@ -37,11 +37,11 @@ class TransformFunction(object):
 self.ctx = ctx
 self.func = func
 self.deserializers = deserializers
-self._rdd_wrapper = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
+self.rdd_wrap_func = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
 self.failure = None
 
 def rdd_wrapper(self, func):
-self._rdd_wrapper = func
+self.rdd_wrap_func = func
 return self
 
 def call(self, milliseconds, jrdds):
@@ -59,7 +59,7 @@ class TransformFunction(object):
 if len(sers) < len(jrdds):
   

spark git commit: [SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 add4e6311 -> 9b99b2b46


[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize

`JavaSerializerInstance.serialize` uses `ByteArrayOutputStream.toByteArray` to 
get the serialized data. `ByteArrayOutputStream.toByteArray` needs to copy the 
content in the internal array to a new array. However, since the array will be 
converted to `ByteBuffer` at once, we can avoid the memory copy.

This PR added `ByteBufferOutputStream` to access the protected `buf` and 
convert it to a `ByteBuffer` directly.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10051 from zsxwing/SPARK-12060.

(cherry picked from commit 1401166576c7018c5f9c31e0a6703d5fb16ea339)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b99b2b4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b99b2b4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b99b2b4

Branch: refs/heads/branch-1.6
Commit: 9b99b2b46c452ba396e922db5fc7eec02c45b158
Parents: add4e63
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Tue Dec 1 09:45:55 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Dec 1 09:46:07 2015 -0800

--
 .../spark/serializer/JavaSerializer.scala   |  7 ++---
 .../spark/util/ByteBufferOutputStream.scala | 31 
 2 files changed, 34 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9b99b2b4/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index b463a71..ea718a0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,8 +24,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.ByteBufferInputStream
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, 
Utils}
 
 private[spark] class JavaSerializationStream(
 out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
@@ -96,11 +95,11 @@ private[spark] class JavaSerializerInstance(
   extends SerializerInstance {
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
-val bos = new ByteArrayOutputStream()
+val bos = new ByteBufferOutputStream()
 val out = serializeStream(bos)
 out.writeObject(t)
 out.close()
-ByteBuffer.wrap(bos.toByteArray)
+bos.toByteBuffer
   }
 
   override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {

http://git-wip-us.apache.org/repos/asf/spark/blob/9b99b2b4/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala 
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
new file mode 100644
index 000..92e4522
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+
+/**
+ * Provide a zero-copy way to convert data in ByteArrayOutputStream to 
ByteBuffer
+ */
+private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
+
+  def toByteBuffer: ByteBuffer = {
+return ByteBuffer.wrap(buf, 0, count)
+  }
+}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master c87531b76 -> 140116657


[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize

`JavaSerializerInstance.serialize` uses `ByteArrayOutputStream.toByteArray` to 
get the serialized data. `ByteArrayOutputStream.toByteArray` needs to copy the 
content in the internal array to a new array. However, since the array will be 
converted to `ByteBuffer` at once, we can avoid the memory copy.

This PR added `ByteBufferOutputStream` to access the protected `buf` and 
convert it to a `ByteBuffer` directly.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10051 from zsxwing/SPARK-12060.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14011665
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14011665
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14011665

Branch: refs/heads/master
Commit: 1401166576c7018c5f9c31e0a6703d5fb16ea339
Parents: c87531b
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Tue Dec 1 09:45:55 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Dec 1 09:45:55 2015 -0800

--
 .../spark/serializer/JavaSerializer.scala   |  7 ++---
 .../spark/util/ByteBufferOutputStream.scala | 31 
 2 files changed, 34 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/14011665/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index b463a71..ea718a0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,8 +24,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.ByteBufferInputStream
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, 
Utils}
 
 private[spark] class JavaSerializationStream(
 out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
@@ -96,11 +95,11 @@ private[spark] class JavaSerializerInstance(
   extends SerializerInstance {
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
-val bos = new ByteArrayOutputStream()
+val bos = new ByteBufferOutputStream()
 val out = serializeStream(bos)
 out.writeObject(t)
 out.close()
-ByteBuffer.wrap(bos.toByteArray)
+bos.toByteBuffer
   }
 
   override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {

http://git-wip-us.apache.org/repos/asf/spark/blob/14011665/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala 
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
new file mode 100644
index 000..92e4522
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+
+/**
+ * Provide a zero-copy way to convert data in ByteArrayOutputStream to 
ByteBuffer
+ */
+private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
+
+  def toByteBuffer: ByteBuffer = {
+return ByteBuffer.wrap(buf, 0, count)
+  }
+}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 96691feae -> 8a75a3049


[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently 
in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is 
serializing it) can lead to concurrentModidicationException in the underlying 
Java hashmap using in the internal Hadoop Configuration object.

The solution is to create a new JobConf in every batch, that is updated by 
`RDD.saveAsHadoopFile()`, while the checkpointing serializes the original 
JobConf.

Tests to be added in #9988 will fail reliably without this patch. Keeping this 
patch really small to make sure that it can be added to previous branches.

Author: Tathagata Das 

Closes #10088 from tdas/SPARK-12087.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a75a304
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a75a304
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a75a304

Branch: refs/heads/master
Commit: 8a75a3049539eeef04c0db51736e97070c162b46
Parents: 96691fe
Author: Tathagata Das 
Authored: Tue Dec 1 21:04:52 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 21:04:52 2015 -0800

--
 .../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8a75a304/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index fb691ee..2762309 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -730,7 +730,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
 val serializableConf = new SerializableJobConf(conf)
 val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
   val file = rddToFileName(prefix, suffix, time)
-  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, 
serializableConf.value)
+  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
+new JobConf(serializableConf.value))
 }
 self.foreachRDD(saveFunc)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 4f07a590c -> 0d57a4ae1


[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently 
in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is 
serializing it) can lead to concurrentModidicationException in the underlying 
Java hashmap using in the internal Hadoop Configuration object.

The solution is to create a new JobConf in every batch, that is updated by 
`RDD.saveAsHadoopFile()`, while the checkpointing serializes the original 
JobConf.

Tests to be added in #9988 will fail reliably without this patch. Keeping this 
patch really small to make sure that it can be added to previous branches.

Author: Tathagata Das 

Closes #10088 from tdas/SPARK-12087.

(cherry picked from commit 8a75a3049539eeef04c0db51736e97070c162b46)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d57a4ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d57a4ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d57a4ae

Branch: refs/heads/branch-1.5
Commit: 0d57a4ae10f4ec40386194bc3c8e27f32da09d4d
Parents: 4f07a59
Author: Tathagata Das 
Authored: Tue Dec 1 21:04:52 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 21:05:18 2015 -0800

--
 .../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0d57a4ae/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 71bec96..aa36997 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -692,7 +692,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
 val serializableConf = new SerializableJobConf(conf)
 val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
   val file = rddToFileName(prefix, suffix, time)
-  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, 
serializableConf.value)
+  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
+new JobConf(serializableConf.value))
 }
 self.foreachRDD(saveFunc)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 a5743affc -> 1f42295b5


[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently 
in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is 
serializing it) can lead to concurrentModidicationException in the underlying 
Java hashmap using in the internal Hadoop Configuration object.

The solution is to create a new JobConf in every batch, that is updated by 
`RDD.saveAsHadoopFile()`, while the checkpointing serializes the original 
JobConf.

Tests to be added in #9988 will fail reliably without this patch. Keeping this 
patch really small to make sure that it can be added to previous branches.

Author: Tathagata Das 

Closes #10088 from tdas/SPARK-12087.

(cherry picked from commit 8a75a3049539eeef04c0db51736e97070c162b46)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f42295b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f42295b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f42295b

Branch: refs/heads/branch-1.6
Commit: 1f42295b5df69a6039ed2ba8ea67a8e57d77644d
Parents: a5743af
Author: Tathagata Das 
Authored: Tue Dec 1 21:04:52 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 21:05:02 2015 -0800

--
 .../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1f42295b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index fb691ee..2762309 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -730,7 +730,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
 val serializableConf = new SerializableJobConf(conf)
 val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
   val file = rddToFileName(prefix, suffix, time)
-  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, 
serializableConf.value)
+  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
+new JobConf(serializableConf.value))
 }
 self.foreachRDD(saveFunc)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 f5af299ab -> b6ba2dab2


[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently 
in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is 
serializing it) can lead to concurrentModidicationException in the underlying 
Java hashmap using in the internal Hadoop Configuration object.

The solution is to create a new JobConf in every batch, that is updated by 
`RDD.saveAsHadoopFile()`, while the checkpointing serializes the original 
JobConf.

Tests to be added in #9988 will fail reliably without this patch. Keeping this 
patch really small to make sure that it can be added to previous branches.

Author: Tathagata Das 

Closes #10088 from tdas/SPARK-12087.

(cherry picked from commit 8a75a3049539eeef04c0db51736e97070c162b46)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6ba2dab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6ba2dab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6ba2dab

Branch: refs/heads/branch-1.4
Commit: b6ba2dab26092f56271114aa62f25b2fc9d6adad
Parents: f5af299
Author: Tathagata Das 
Authored: Tue Dec 1 21:04:52 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 21:05:37 2015 -0800

--
 .../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b6ba2dab/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 358e4c6..4e392f5 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -691,7 +691,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
 val serializableConf = new SerializableWritable(conf)
 val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
   val file = rddToFileName(prefix, suffix, time)
-  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, 
serializableConf.value)
+  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
+new JobConf(serializableConf.value))
 }
 self.foreachRDD(saveFunc)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12058][HOTFIX] Disable KinesisStreamTests

2015-11-30 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master ecc00ec3f -> edb26e7f4


[SPARK-12058][HOTFIX] Disable KinesisStreamTests

KinesisStreamTests in test.py is broken because of #9403. See 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/46896/testReport/(root)/KinesisStreamTests/test_kinesis_stream/

Because Streaming Python didn’t work when merging 
https://github.com/apache/spark/pull/9403, the PR build didn’t report the 
Python test failure actually.

This PR just disabled the test to unblock #10039

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10047 from zsxwing/disable-python-kinesis-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edb26e7f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edb26e7f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edb26e7f

Branch: refs/heads/master
Commit: edb26e7f4e1164645971c9a139eb29ddec8acc5d
Parents: ecc00ec
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Mon Nov 30 16:31:59 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Mon Nov 30 16:31:59 2015 -0800

--
 python/pyspark/streaming/tests.py | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/edb26e7f/python/pyspark/streaming/tests.py
--
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index d380d69..a647e6b 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1409,6 +1409,7 @@ class KinesisStreamTests(PySparkStreamingTestCase):
 InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2,
 "awsAccessKey", "awsSecretKey")
 
+@unittest.skip("Enable it when we fix SPAKR-12058")
 def test_kinesis_stream(self):
 if not are_kinesis_tests_enabled:
 sys.stderr.write(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12021][STREAMING][TESTS] Fix the potential dead-lock in StreamingListenerSuite

2015-11-27 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 dfc98fac9 -> 996635793


[SPARK-12021][STREAMING][TESTS] Fix the potential dead-lock in 
StreamingListenerSuite

In StreamingListenerSuite."don't call ssc.stop in listener", after the main 
thread calls `ssc.stop()`,  `StreamingContextStoppingCollector` may call  
`ssc.stop()` in the listener bus thread, which is a dead-lock. This PR updated 
`StreamingContextStoppingCollector` to only call `ssc.stop()` in the first 
batch to avoid the dead-lock.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10011 from zsxwing/fix-test-deadlock.

(cherry picked from commit f57e6c9effdb9e282fc8ae66dc30fe053fed5272)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99663579
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99663579
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99663579

Branch: refs/heads/branch-1.6
Commit: 9966357932a50aa22f94f39201559beb8c0c6efb
Parents: dfc98fa
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Fri Nov 27 11:50:18 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri Nov 27 11:50:28 2015 -0800

--
 .../streaming/StreamingListenerSuite.scala  | 25 +++-
 1 file changed, 19 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/99663579/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index df4575a..04cd5bd 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -222,7 +222,11 @@ class StreamingListenerSuite extends TestSuiteBase with 
Matchers {
 val batchCounter = new BatchCounter(_ssc)
 _ssc.start()
 // Make sure running at least one batch
-batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, 
timeout = 1)
+if (!batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 
1, timeout = 1)) {
+  fail("The first batch cannot complete in 10 seconds")
+}
+// When reaching here, we can make sure 
`StreamingContextStoppingCollector` won't call
+// `ssc.stop()`, so it's safe to call `_ssc.stop()` now.
 _ssc.stop()
 assert(contextStoppingCollector.sparkExSeen)
   }
@@ -345,12 +349,21 @@ class FailureReasonsCollector extends StreamingListener {
  */
 class StreamingContextStoppingCollector(val ssc: StreamingContext) extends 
StreamingListener {
   @volatile var sparkExSeen = false
+
+  private var isFirstBatch = true
+
   override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted) {
-try {
-  ssc.stop()
-} catch {
-  case se: SparkException =>
-sparkExSeen = true
+if (isFirstBatch) {
+  // We should only call `ssc.stop()` in the first batch. Otherwise, it's 
possible that the main
+  // thread is calling `ssc.stop()`, while 
StreamingContextStoppingCollector is also calling
+  // `ssc.stop()` in the listener thread, which becomes a dead-lock.
+  isFirstBatch = false
+  try {
+ssc.stop()
+  } catch {
+case se: SparkException =>
+  sparkExSeen = true
+  }
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12021][STREAMING][TESTS] Fix the potential dead-lock in StreamingListenerSuite

2015-11-27 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master ba02f6cb5 -> f57e6c9ef


[SPARK-12021][STREAMING][TESTS] Fix the potential dead-lock in 
StreamingListenerSuite

In StreamingListenerSuite."don't call ssc.stop in listener", after the main 
thread calls `ssc.stop()`,  `StreamingContextStoppingCollector` may call  
`ssc.stop()` in the listener bus thread, which is a dead-lock. This PR updated 
`StreamingContextStoppingCollector` to only call `ssc.stop()` in the first 
batch to avoid the dead-lock.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10011 from zsxwing/fix-test-deadlock.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f57e6c9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f57e6c9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f57e6c9e

Branch: refs/heads/master
Commit: f57e6c9effdb9e282fc8ae66dc30fe053fed5272
Parents: ba02f6c
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Fri Nov 27 11:50:18 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri Nov 27 11:50:18 2015 -0800

--
 .../streaming/StreamingListenerSuite.scala  | 25 +++-
 1 file changed, 19 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f57e6c9e/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index df4575a..04cd5bd 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -222,7 +222,11 @@ class StreamingListenerSuite extends TestSuiteBase with 
Matchers {
 val batchCounter = new BatchCounter(_ssc)
 _ssc.start()
 // Make sure running at least one batch
-batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, 
timeout = 1)
+if (!batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 
1, timeout = 1)) {
+  fail("The first batch cannot complete in 10 seconds")
+}
+// When reaching here, we can make sure 
`StreamingContextStoppingCollector` won't call
+// `ssc.stop()`, so it's safe to call `_ssc.stop()` now.
 _ssc.stop()
 assert(contextStoppingCollector.sparkExSeen)
   }
@@ -345,12 +349,21 @@ class FailureReasonsCollector extends StreamingListener {
  */
 class StreamingContextStoppingCollector(val ssc: StreamingContext) extends 
StreamingListener {
   @volatile var sparkExSeen = false
+
+  private var isFirstBatch = true
+
   override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted) {
-try {
-  ssc.stop()
-} catch {
-  case se: SparkException =>
-sparkExSeen = true
+if (isFirstBatch) {
+  // We should only call `ssc.stop()` in the first batch. Otherwise, it's 
possible that the main
+  // thread is calling `ssc.stop()`, while 
StreamingContextStoppingCollector is also calling
+  // `ssc.stop()` in the listener thread, which becomes a dead-lock.
+  isFirstBatch = false
+  try {
+ssc.stop()
+  } catch {
+case se: SparkException =>
+  sparkExSeen = true
+  }
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task

2015-11-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 b1fcefca6 -> 7900d192e


[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool 
doesn't cache any task

In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, 
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any 
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to 
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` 
threads, and after that, cache tasks to `LinkedBlockingQueue`.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #9978 from zsxwing/cached-threadpool.

(cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7900d192
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7900d192
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7900d192

Branch: refs/heads/branch-1.5
Commit: 7900d192e8adf501fbed0d0704d60d2c0e63a764
Parents: b1fcefc
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Nov 25 23:31:53 2015 -0800

--
 .../org/apache/spark/util/ThreadUtils.scala | 14 --
 .../apache/spark/util/ThreadUtilsSuite.scala| 45 
 2 files changed, 56 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7900d192/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 06976f8..3159ef7 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -57,10 +57,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
*/
-  def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): 
ThreadPoolExecutor = {
+  def newDaemonCachedThreadPool(
+  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
 val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
-  0, maxThreadNumber, 60L, TimeUnit.SECONDS, new 
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
+  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
+  keepAliveSeconds,
+  TimeUnit.SECONDS,
+  new LinkedBlockingQueue[Runnable],
+  threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7900d192/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 620e4de..92ae038 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
 import scala.concurrent.{Await, Future}
 import scala.util.Random
 
+import org.scalatest.concurrent.Eventually._
+
 import org.apache.spark.SparkFunSuite
 
 class ThreadUtilsSuite extends SparkFunSuite {
@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
 }
   }
 
+  test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+  "ThreadUtilsSuite-newDaemonCachedThreadPool",
+  maxThreadNumber,
+  keepAliveSeconds = 2)
+try {
+  for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+  }
+})
+  }
+  startThreadsLatch.await(10, TimeUnit.SECONDS)
+  assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+  assert(cachedThreadPool.getQueue.size === 0)
+
+  // Submit a new task and it should be put into the queue since t

spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task

2015-11-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 1df3e8230 -> f5af299ab


[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool 
doesn't cache any task

In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, 
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any 
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to 
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` 
threads, and after that, cache tasks to `LinkedBlockingQueue`.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #9978 from zsxwing/cached-threadpool.

(cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5af299a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5af299a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5af299a

Branch: refs/heads/branch-1.4
Commit: f5af299ab06f654e000c99917c703f989bffaa43
Parents: 1df3e82
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Nov 25 23:35:33 2015 -0800

--
 .../org/apache/spark/util/ThreadUtils.scala | 14 --
 .../apache/spark/util/ThreadUtilsSuite.scala| 45 
 2 files changed, 56 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f5af299a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index ca5624a..94d581f 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -56,10 +56,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
*/
-  def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): 
ThreadPoolExecutor = {
+  def newDaemonCachedThreadPool(
+  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
 val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
-  0, maxThreadNumber, 60L, TimeUnit.SECONDS, new 
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
+  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
+  keepAliveSeconds,
+  TimeUnit.SECONDS,
+  new LinkedBlockingQueue[Runnable],
+  threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f5af299a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 8c51e6b..1fb6364 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -23,6 +23,8 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
 
+import org.scalatest.concurrent.Eventually._
+
 import org.apache.spark.SparkFunSuite
 
 class ThreadUtilsSuite extends SparkFunSuite {
@@ -58,6 +60,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
 }
   }
 
+  test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+  "ThreadUtilsSuite-newDaemonCachedThreadPool",
+  maxThreadNumber,
+  keepAliveSeconds = 2)
+try {
+  for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+  }
+})
+  }
+  startThreadsLatch.await(10, TimeUnit.SECONDS)
+  assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+  assert(cachedThreadPool.getQueue.size === 0)
+
+  // Submit a new task and it should be p

spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task

2015-11-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 7e7f2627f -> 0df6beccc


[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool 
doesn't cache any task

In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, 
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any 
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to 
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` 
threads, and after that, cache tasks to `LinkedBlockingQueue`.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #9978 from zsxwing/cached-threadpool.

(cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0df6becc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0df6becc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0df6becc

Branch: refs/heads/branch-1.6
Commit: 0df6beccc84166a00c7c98929bf487d9cea68e1d
Parents: 7e7f262
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Nov 25 23:31:36 2015 -0800

--
 .../org/apache/spark/util/ThreadUtils.scala | 14 --
 .../apache/spark/util/ThreadUtilsSuite.scala| 45 
 2 files changed, 56 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0df6becc/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 5328344..f9fbe2f 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -56,10 +56,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
*/
-  def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): 
ThreadPoolExecutor = {
+  def newDaemonCachedThreadPool(
+  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
 val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
-  0, maxThreadNumber, 60L, TimeUnit.SECONDS, new 
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
+  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
+  keepAliveSeconds,
+  TimeUnit.SECONDS,
+  new LinkedBlockingQueue[Runnable],
+  threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0df6becc/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 620e4de..92ae038 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
 import scala.concurrent.{Await, Future}
 import scala.util.Random
 
+import org.scalatest.concurrent.Eventually._
+
 import org.apache.spark.SparkFunSuite
 
 class ThreadUtilsSuite extends SparkFunSuite {
@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
 }
   }
 
+  test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+  "ThreadUtilsSuite-newDaemonCachedThreadPool",
+  maxThreadNumber,
+  keepAliveSeconds = 2)
+try {
+  for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+  }
+})
+  }
+  startThreadsLatch.await(10, TimeUnit.SECONDS)
+  assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+  assert(cachedThreadPool.getQueue.size === 0)
+
+  // Submit a new task and it should be put into the queue since t

spark git commit: [SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool doesn't cache any task

2015-11-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 068b6438d -> d3ef69332


[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool 
doesn't cache any task

In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`, 
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any 
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to 
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber` 
threads, and after that, cache tasks to `LinkedBlockingQueue`.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #9978 from zsxwing/cached-threadpool.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3ef6933
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3ef6933
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3ef6933

Branch: refs/heads/master
Commit: d3ef693325f91a1ed340c9756c81244a80398eb2
Parents: 068b643
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Nov 25 23:31:21 2015 -0800

--
 .../org/apache/spark/util/ThreadUtils.scala | 14 --
 .../apache/spark/util/ThreadUtilsSuite.scala| 45 
 2 files changed, 56 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d3ef6933/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 5328344..f9fbe2f 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -56,10 +56,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
*/
-  def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int): 
ThreadPoolExecutor = {
+  def newDaemonCachedThreadPool(
+  prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
 val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
-  0, maxThreadNumber, 60L, TimeUnit.SECONDS, new 
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+  maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
+  maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
+  keepAliveSeconds,
+  TimeUnit.SECONDS,
+  new LinkedBlockingQueue[Runnable],
+  threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d3ef6933/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 620e4de..92ae038 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
 import scala.concurrent.{Await, Future}
 import scala.util.Random
 
+import org.scalatest.concurrent.Eventually._
+
 import org.apache.spark.SparkFunSuite
 
 class ThreadUtilsSuite extends SparkFunSuite {
@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
 }
   }
 
+  test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+  "ThreadUtilsSuite-newDaemonCachedThreadPool",
+  maxThreadNumber,
+  keepAliveSeconds = 2)
+try {
+  for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+  }
+})
+  }
+  startThreadsLatch.await(10, TimeUnit.SECONDS)
+  assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+  assert(cachedThreadPool.getQueue.size === 0)
+
+  // Submit a new task and it should be put into the queue since the 
thread number reaches the
+  // limitation
+  cachedThreadPool.execute(new Runnable {
+override def run(): Unit = {

spark git commit: [SPARK-11872] Prevent the call to SparkContext#stop() in the listener bus's thread

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 19530da69 -> 81012546e


[SPARK-11872] Prevent the call to SparkContext#stop() in the listener bus's 
thread

This is continuation of SPARK-11761

Andrew suggested adding this protection. See tail of 
https://github.com/apache/spark/pull/9741

Author: tedyu 

Closes #9852 from tedyu/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81012546
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81012546
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81012546

Branch: refs/heads/master
Commit: 81012546ee5a80d2576740af0dad067b0f5962c5
Parents: 19530da
Author: tedyu 
Authored: Tue Nov 24 12:22:33 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Nov 24 12:22:33 2015 -0800

--
 .../scala/org/apache/spark/SparkContext.scala   |  4 +++
 .../spark/scheduler/SparkListenerSuite.scala| 31 
 2 files changed, 35 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/81012546/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b153a7b..e19ba11 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1694,6 +1694,10 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
 
   // Shut down the SparkContext.
   def stop() {
+if (AsynchronousListenerBus.withinListenerThread.value) {
+  throw new SparkException("Cannot stop SparkContext within listener 
thread of" +
+" AsynchronousListenerBus")
+}
 // Use the stopping variable to ensure no contention for the stop scenario.
 // Still track the stopped variable for use elsewhere in the code.
 if (!stopped.compareAndSet(false, true)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/81012546/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 84e5458..f20d5be 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 
 import org.scalatest.Matchers
 
+import org.apache.spark.SparkException
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.util.ResetSystemProperties
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
@@ -36,6 +37,21 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
 
   val jobCompletionTime = 1421191296660L
 
+  test("don't call sc.stop in listener") {
+sc = new SparkContext("local", "SparkListenerSuite")
+val listener = new SparkContextStoppingListener(sc)
+val bus = new LiveListenerBus
+bus.addListener(listener)
+
+// Starting listener bus should flush all buffered events
+bus.start(sc)
+bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+
+bus.stop()
+assert(listener.sparkExSeen)
+  }
+
   test("basic creation and shutdown of LiveListenerBus") {
 val counter = new BasicJobCounter
 val bus = new LiveListenerBus
@@ -443,6 +459,21 @@ private class BasicJobCounter extends SparkListener {
   override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
 }
 
+/**
+ * A simple listener that tries to stop SparkContext.
+ */
+private class SparkContextStoppingListener(val sc: SparkContext) extends 
SparkListener {
+  @volatile var sparkExSeen = false
+  override def onJobEnd(job: SparkListenerJobEnd): Unit = {
+try {
+  sc.stop()
+} catch {
+  case se: SparkException =>
+sparkExSeen = true
+}
+  }
+}
+
 private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends 
SparkListener {
   var count = 0
   override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [STREAMING][FLAKY-TEST] Catch execution context race condition in `FileBasedWriteAheadLog.close()`

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 4d6bbbc03 -> a5d988763


[STREAMING][FLAKY-TEST] Catch execution context race condition in 
`FileBasedWriteAheadLog.close()`

There is a race condition in `FileBasedWriteAheadLog.close()`, where if 
delete's of old log files are in progress, the write ahead log may close, and 
result in a `RejectedExecutionException`. This is okay, and should be handled 
gracefully.

Example test failures:
https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.6-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=spark-test/95/testReport/junit/org.apache.spark.streaming.util/BatchedWriteAheadLogWithCloseFileAfterWriteSuite/BatchedWriteAheadLog___clean_old_logs/

The reason the test fails is in `afterEach`, `writeAheadLog.close` is called, 
and there may still be async deletes in flight.

tdas zsxwing

Author: Burak Yavuz <brk...@gmail.com>

Closes #9953 from brkyvz/flaky-ss.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5d98876
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5d98876
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5d98876

Branch: refs/heads/master
Commit: a5d988763319f63a8e2b58673dd4f9098f17c835
Parents: 4d6bbbc
Author: Burak Yavuz <brk...@gmail.com>
Authored: Tue Nov 24 20:58:47 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Nov 24 20:58:47 2015 -0800

--
 .../streaming/util/FileBasedWriteAheadLog.scala | 16 +++-
 1 file changed, 11 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a5d98876/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 72705f1..f5165f7 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.streaming.util
 
 import java.nio.ByteBuffer
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor}
 import java.util.{Iterator => JIterator}
 
 import scala.collection.JavaConverters._
@@ -176,10 +176,16 @@ private[streaming] class FileBasedWriteAheadLog(
 }
 oldLogFiles.foreach { logInfo =>
   if (!executionContext.isShutdown) {
-val f = Future { deleteFile(logInfo) }(executionContext)
-if (waitForCompletion) {
-  import scala.concurrent.duration._
-  Await.ready(f, 1 second)
+try {
+  val f = Future { deleteFile(logInfo) }(executionContext)
+  if (waitForCompletion) {
+import scala.concurrent.duration._
+Await.ready(f, 1 second)
+  }
+} catch {
+  case e: RejectedExecutionException =>
+logWarning("Execution context shutdown before deleting old 
WriteAheadLogs. " +
+  "This would not affect recovery correctness.", e)
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [STREAMING][FLAKY-TEST] Catch execution context race condition in `FileBasedWriteAheadLog.close()`

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 862d788fc -> b18112666


[STREAMING][FLAKY-TEST] Catch execution context race condition in 
`FileBasedWriteAheadLog.close()`

There is a race condition in `FileBasedWriteAheadLog.close()`, where if 
delete's of old log files are in progress, the write ahead log may close, and 
result in a `RejectedExecutionException`. This is okay, and should be handled 
gracefully.

Example test failures:
https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.6-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=spark-test/95/testReport/junit/org.apache.spark.streaming.util/BatchedWriteAheadLogWithCloseFileAfterWriteSuite/BatchedWriteAheadLog___clean_old_logs/

The reason the test fails is in `afterEach`, `writeAheadLog.close` is called, 
and there may still be async deletes in flight.

tdas zsxwing

Author: Burak Yavuz <brk...@gmail.com>

Closes #9953 from brkyvz/flaky-ss.

(cherry picked from commit a5d988763319f63a8e2b58673dd4f9098f17c835)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1811266
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1811266
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1811266

Branch: refs/heads/branch-1.6
Commit: b18112666adec0a942d1cfe8d6b9f1e7c7201fcd
Parents: 862d788
Author: Burak Yavuz <brk...@gmail.com>
Authored: Tue Nov 24 20:58:47 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Nov 24 20:59:01 2015 -0800

--
 .../streaming/util/FileBasedWriteAheadLog.scala | 16 +++-
 1 file changed, 11 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b1811266/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 72705f1..f5165f7 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.streaming.util
 
 import java.nio.ByteBuffer
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor}
 import java.util.{Iterator => JIterator}
 
 import scala.collection.JavaConverters._
@@ -176,10 +176,16 @@ private[streaming] class FileBasedWriteAheadLog(
 }
 oldLogFiles.foreach { logInfo =>
   if (!executionContext.isShutdown) {
-val f = Future { deleteFile(logInfo) }(executionContext)
-if (waitForCompletion) {
-  import scala.concurrent.duration._
-  Await.ready(f, 1 second)
+try {
+  val f = Future { deleteFile(logInfo) }(executionContext)
+  if (waitForCompletion) {
+import scala.concurrent.duration._
+Await.ready(f, 1 second)
+  }
+} catch {
+  case e: RejectedExecutionException =>
+logWarning("Execution context shutdown before deleting old 
WriteAheadLogs. " +
+  "This would not affect recovery correctness.", e)
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and recovered from checkpoint file

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 151d7c2ba -> 216988688


[SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and 
recovered from checkpoint file

This solves the following exception caused when empty state RDD is checkpointed 
and recovered. The root cause is that an empty OpenHashMapBasedStateMap cannot 
be deserialized as the initialCapacity is set to zero.
```
Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most 
recent failure: Lost task 0.0 in stage 6.0 (TID 20, localhost): 
java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity
at scala.Predef$.require(Predef.scala:233)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:96)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:86)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.readObject(StateMap.scala:291)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
```

Author: Tathagata Das 

Closes #9958 from tdas/SPARK-11979.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21698868
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21698868
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21698868

Branch: refs/heads/master
Commit: 2169886883d33b33acf378ac42a626576b342df1
Parents: 151d7c2
Author: Tathagata Das 
Authored: Tue Nov 24 23:13:01 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Nov 24 23:13:01 2015 -0800

--
 .../apache/spark/streaming/util/StateMap.scala  | 19 -
 .../apache/spark/streaming/StateMapSuite.scala  | 30 +---
 .../streaming/rdd/TrackStateRDDSuite.scala  | 10 +++
 3 files changed, 42 insertions(+), 17 

spark git commit: [SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and recovered from checkpoint file

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 68bcb9b33 -> 7f030aa42


[SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and 
recovered from checkpoint file

This solves the following exception caused when empty state RDD is checkpointed 
and recovered. The root cause is that an empty OpenHashMapBasedStateMap cannot 
be deserialized as the initialCapacity is set to zero.
```
Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most 
recent failure: Lost task 0.0 in stage 6.0 (TID 20, localhost): 
java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity
at scala.Predef$.require(Predef.scala:233)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:96)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:86)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.readObject(StateMap.scala:291)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
```

Author: Tathagata Das 

Closes #9958 from tdas/SPARK-11979.

(cherry picked from commit 2169886883d33b33acf378ac42a626576b342df1)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f030aa4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f030aa4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f030aa4

Branch: refs/heads/branch-1.6
Commit: 7f030aa422802a8e7077e1c74a59ab9a5fe54488
Parents: 68bcb9b
Author: Tathagata Das 
Authored: Tue Nov 24 23:13:01 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Nov 24 23:13:29 2015 -0800

--
 .../apache/spark/streaming/util/StateMap.scala  | 19 -
 

<    3   4   5   6   7   8