Repository: spark
Updated Branches:
  refs/heads/branch-1.6 6913cfe4b -> 6d6fe8085


[SPARK-11814][STREAMING] Add better default checkpoint duration

DStream checkpoint interval is by default set at max(10 second, batch 
interval). That's bad for large batch intervals where the checkpoint interval = 
batch interval, and RDDs get checkpointed every batch.
This PR is to set the checkpoint interval of trackStateByKey to 10 * batch 
duration.

Author: Tathagata Das <[email protected]>

Closes #9805 from tdas/SPARK-11814.

(cherry picked from commit a402c92c92b2e1c85d264f6077aec8f6d6a08270)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d6fe808
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d6fe808
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d6fe808

Branch: refs/heads/branch-1.6
Commit: 6d6fe80857e8dc322550ca1d872aeee165b53054
Parents: 6913cfe
Author: Tathagata Das <[email protected]>
Authored: Wed Nov 18 16:08:06 2015 -0800
Committer: Tathagata Das <[email protected]>
Committed: Wed Nov 18 16:09:29 2015 -0800

----------------------------------------------------------------------
 .../streaming/dstream/TrackStateDStream.scala   | 13 ++++++
 .../spark/streaming/TrackStateByKeySuite.scala  | 44 +++++++++++++++++++-
 2 files changed, 56 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6d6fe808/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala
index 98e881e..0ada111 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala
@@ -25,6 +25,7 @@ import org.apache.spark.rdd.{EmptyRDD, RDD}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.rdd.{TrackStateRDD, TrackStateRDDRecord}
+import org.apache.spark.streaming.dstream.InternalTrackStateDStream._
 
 /**
  * :: Experimental ::
@@ -120,6 +121,14 @@ class InternalTrackStateDStream[K: ClassTag, V: ClassTag, 
S: ClassTag, E: ClassT
   /** Enable automatic checkpointing */
   override val mustCheckpoint = true
 
+  /** Override the default checkpoint duration */
+  override def initialize(time: Time): Unit = {
+    if (checkpointDuration == null) {
+      checkpointDuration = slideDuration * 
DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
+    }
+    super.initialize(time)
+  }
+
   /** Method that generates a RDD for the given time */
   override def compute(validTime: Time): Option[RDD[TrackStateRDDRecord[K, S, 
E]]] = {
     // Get the previous state or create a new empty state RDD
@@ -141,3 +150,7 @@ class InternalTrackStateDStream[K: ClassTag, V: ClassTag, 
S: ClassTag, E: ClassT
     }
   }
 }
+
+private[streaming] object InternalTrackStateDStream {
+  private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6d6fe808/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala
index e3072b4..58aef74 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala
@@ -22,9 +22,10 @@ import java.io.File
 import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 import scala.reflect.ClassTag
 
+import org.scalatest.PrivateMethodTester._
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 
-import org.apache.spark.streaming.dstream.{TrackStateDStream, 
TrackStateDStreamImpl}
+import org.apache.spark.streaming.dstream.{InternalTrackStateDStream, 
TrackStateDStream, TrackStateDStreamImpl}
 import org.apache.spark.util.{ManualClock, Utils}
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 
@@ -57,6 +58,12 @@ class TrackStateByKeySuite extends SparkFunSuite with 
BeforeAndAfterAll with Bef
     sc = new SparkContext(conf)
   }
 
+  override def afterAll(): Unit = {
+    if (sc != null) {
+      sc.stop()
+    }
+  }
+
   test("state - get, exists, update, remove, ") {
     var state: StateImpl[Int] = null
 
@@ -436,6 +443,41 @@ class TrackStateByKeySuite extends SparkFunSuite with 
BeforeAndAfterAll with Bef
     assert(collectedStateSnapshots.last.toSet === Set(("a", 1)))
   }
 
+  test("trackStateByKey - checkpoint durations") {
+    val privateMethod = PrivateMethod[InternalTrackStateDStream[_, _, _, 
_]]('internalStream)
+
+    def testCheckpointDuration(
+        batchDuration: Duration,
+        expectedCheckpointDuration: Duration,
+        explicitCheckpointDuration: Option[Duration] = None
+      ): Unit = {
+      try {
+        ssc = new StreamingContext(sc, batchDuration)
+        val inputStream = new TestInputStream(ssc, Seq.empty[Seq[Int]], 
2).map(_ -> 1)
+        val dummyFunc = (value: Option[Int], state: State[Int]) => 0
+        val trackStateStream = 
inputStream.trackStateByKey(StateSpec.function(dummyFunc))
+        val internalTrackStateStream = trackStateStream invokePrivate 
privateMethod()
+
+        explicitCheckpointDuration.foreach { d =>
+          trackStateStream.checkpoint(d)
+        }
+        trackStateStream.register()
+        ssc.start()  // should initialize all the checkpoint durations
+        assert(trackStateStream.checkpointDuration === null)
+        assert(internalTrackStateStream.checkpointDuration === 
expectedCheckpointDuration)
+      } finally {
+        StreamingContext.getActive().foreach { _.stop(stopSparkContext = 
false) }
+      }
+    }
+
+    testCheckpointDuration(Milliseconds(100), Seconds(1))
+    testCheckpointDuration(Seconds(1), Seconds(10))
+    testCheckpointDuration(Seconds(10), Seconds(100))
+
+    testCheckpointDuration(Milliseconds(100), Seconds(2), Some(Seconds(2)))
+    testCheckpointDuration(Seconds(1), Seconds(2), Some(Seconds(2)))
+    testCheckpointDuration(Seconds(10), Seconds(20), Some(Seconds(20)))
+  }
 
   private def testOperation[K: ClassTag, S: ClassTag, T: ClassTag](
       input: Seq[Seq[K]],


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to