Repository: spark
Updated Branches:
  refs/heads/master 4ce3bab89 -> 3eaed8769


[SPARK-8080] [STREAMING] Receiver.store with Iterator does not give correct 
count at Spark UI

tdas  zsxwing this is the new PR for Spark-8080

I have merged https://github.com/apache/spark/pull/6659

Also to mention , for MEMORY_ONLY settings , when Block is not able to 
unrollSafely to memory if enough space is not there, BlockManager won't try to 
put the block and ReceivedBlockHandler will throw SparkException as it could 
not find the block id in PutResult. Thus number of records in block won't be 
counted if Block failed to unroll in memory. Which is fine.

For MEMORY_DISK settings , if BlockManager not able to unroll block to memory, 
block will still get deseralized to Disk. Same for WAL based store. So for 
those cases ( storage level = memory + disk )  number of records will be 
counted even though the block not able to unroll to memory.

thus I added the isFullyConsumed in the CountingIterator but have not used it 
as such case will never happen that block not fully consumed and 
ReceivedBlockHandler still get the block ID.

I have added few test cases to cover those block unrolling scenarios also.

Author: Dibyendu Bhattacharya <[email protected]>
Author: U-PEROOT\UBHATD1 <[email protected]>

Closes #6707 from dibbhatt/master and squashes the following commits:

f6cb6b5 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with 
Iterator does not give correct count at Spark UI
f37cfd8 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with 
Iterator does not give correct count at Spark UI
5a8344a [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with 
Iterator does not give correct count at Spark UI Count ByteBufferBlock as 1 
count
fceac72 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with 
Iterator does not give correct count at Spark UI
0153e7e [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with 
Iterator does not give correct count at Spark UI Fixed comments given by 
@zsxwing
4c5931d [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with 
Iterator does not give correct count at Spark UI
01e6dc8 [U-PEROOT\UBHATD1] A


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3eaed876
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3eaed876
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3eaed876

Branch: refs/heads/master
Commit: 3eaed8769c16e887edb9d54f5816b4ee6da23de5
Parents: 4ce3bab
Author: Dibyendu Bhattacharya <[email protected]>
Authored: Thu Jun 18 19:58:47 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Thu Jun 18 20:00:05 2015 -0700

----------------------------------------------------------------------
 .../receiver/ReceivedBlockHandler.scala         |  53 ++++++-
 .../receiver/ReceiverSupervisorImpl.scala       |   7 +-
 .../streaming/ReceivedBlockHandlerSuite.scala   | 154 ++++++++++++++++++-
 .../streaming/ReceivedBlockTrackerSuite.scala   |   2 +-
 4 files changed, 194 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3eaed876/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 207d64d..c8dd6e0 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -32,7 +32,10 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
 
 /** Trait that represents the metadata related to storage of blocks */
 private[streaming] trait ReceivedBlockStoreResult {
-  def blockId: StreamBlockId  // Any implementation of this trait will store a 
block id
+  // Any implementation of this trait will store a block id
+  def blockId: StreamBlockId
+  // Any implementation of this trait will have to return the number of records
+  def numRecords: Option[Long]
 }
 
 /** Trait that represents a class that handles the storage of blocks received 
by receiver */
@@ -51,7 +54,8 @@ private[streaming] trait ReceivedBlockHandler {
  * that stores the metadata related to storage of blocks using
  * [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]]
  */
-private[streaming] case class BlockManagerBasedStoreResult(blockId: 
StreamBlockId)
+private[streaming] case class BlockManagerBasedStoreResult(
+      blockId: StreamBlockId, numRecords: Option[Long])
   extends ReceivedBlockStoreResult
 
 
@@ -64,11 +68,20 @@ private[streaming] class BlockManagerBasedBlockHandler(
   extends ReceivedBlockHandler with Logging {
 
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
+
+    var numRecords = None: Option[Long]
+
     val putResult: Seq[(BlockId, BlockStatus)] = block match {
       case ArrayBufferBlock(arrayBuffer) =>
-        blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, 
tellMaster = true)
+        numRecords = Some(arrayBuffer.size.toLong)
+        blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
+          tellMaster = true)
       case IteratorBlock(iterator) =>
-        blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = 
true)
+        val countIterator = new CountingIterator(iterator)
+        val putResult = blockManager.putIterator(blockId, countIterator, 
storageLevel,
+          tellMaster = true)
+        numRecords = countIterator.count
+        putResult
       case ByteBufferBlock(byteBuffer) =>
         blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = 
true)
       case o =>
@@ -79,7 +92,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
       throw new SparkException(
         s"Could not store $blockId to block manager with storage level 
$storageLevel")
     }
-    BlockManagerBasedStoreResult(blockId)
+    BlockManagerBasedStoreResult(blockId, numRecords)
   }
 
   def cleanupOldBlocks(threshTime: Long) {
@@ -96,6 +109,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
  */
 private[streaming] case class WriteAheadLogBasedStoreResult(
     blockId: StreamBlockId,
+    numRecords: Option[Long],
     walRecordHandle: WriteAheadLogRecordHandle
   ) extends ReceivedBlockStoreResult
 
@@ -151,12 +165,17 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
    */
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): 
ReceivedBlockStoreResult = {
 
+    var numRecords = None: Option[Long]
     // Serialize the block so that it can be inserted into both
     val serializedBlock = block match {
       case ArrayBufferBlock(arrayBuffer) =>
+        numRecords = Some(arrayBuffer.size.toLong)
         blockManager.dataSerialize(blockId, arrayBuffer.iterator)
       case IteratorBlock(iterator) =>
-        blockManager.dataSerialize(blockId, iterator)
+        val countIterator = new CountingIterator(iterator)
+        val serializedBlock = blockManager.dataSerialize(blockId, 
countIterator)
+        numRecords = countIterator.count
+        serializedBlock
       case ByteBufferBlock(byteBuffer) =>
         byteBuffer
       case _ =>
@@ -181,7 +200,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
     // Combine the futures, wait for both to complete, and return the write 
ahead log record handle
     val combinedFuture = 
storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
     val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
-    WriteAheadLogBasedStoreResult(blockId, walRecordHandle)
+    WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle)
   }
 
   def cleanupOldBlocks(threshTime: Long) {
@@ -199,3 +218,23 @@ private[streaming] object WriteAheadLogBasedBlockHandler {
     new Path(checkpointDir, new Path("receivedData", 
streamId.toString)).toString
   }
 }
+
+/**
+ * A utility that will wrap the Iterator to get the count
+ */
+private class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] {
+   private var _count = 0
+
+   private def isFullyConsumed: Boolean = !iterator.hasNext
+
+   def hasNext(): Boolean = iterator.hasNext
+
+   def count(): Option[Long] = {
+     if (isFullyConsumed) Some(_count) else None
+   }
+
+   def next(): T = {
+    _count += 1
+    iterator.next()
+   }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3eaed876/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 8be732b..6078cdf 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -137,15 +137,10 @@ private[streaming] class ReceiverSupervisorImpl(
       blockIdOption: Option[StreamBlockId]
     ) {
     val blockId = blockIdOption.getOrElse(nextBlockId)
-    val numRecords = receivedBlock match {
-      case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong)
-      case _ => None
-    }
-
     val time = System.currentTimeMillis
     val blockStoreResult = receivedBlockHandler.storeBlock(blockId, 
receivedBlock)
     logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} 
ms")
-
+    val numRecords = blockStoreResult.numRecords
     val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, 
blockStoreResult)
     trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
     logDebug(s"Reported block $blockId")

http://git-wip-us.apache.org/repos/asf/spark/blob/3eaed876/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index cca8ced..6c0c926 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -49,7 +49,6 @@ class ReceivedBlockHandlerSuite
 
   val conf = new 
SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", 
"1")
   val hadoopConf = new Configuration()
-  val storageLevel = StorageLevel.MEMORY_ONLY_SER
   val streamId = 1
   val securityMgr = new SecurityManager(conf)
   val mapOutputTracker = new MapOutputTrackerMaster(conf)
@@ -57,10 +56,12 @@ class ReceivedBlockHandlerSuite
   val serializer = new KryoSerializer(conf)
   val manualClock = new ManualClock
   val blockManagerSize = 10000000
+  val blockManagerBuffer = new ArrayBuffer[BlockManager]()
 
   var rpcEnv: RpcEnv = null
   var blockManagerMaster: BlockManagerMaster = null
   var blockManager: BlockManager = null
+  var storageLevel: StorageLevel = null
   var tempDirectory: File = null
 
   before {
@@ -70,20 +71,21 @@ class ReceivedBlockHandlerSuite
     blockManagerMaster = new 
BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf, new 
LiveListenerBus)), conf, true)
 
-    blockManager = new BlockManager("bm", rpcEnv, blockManagerMaster, 
serializer,
-      blockManagerSize, conf, mapOutputTracker, shuffleManager,
-      new NioBlockTransferService(conf, securityMgr), securityMgr, 0)
-    blockManager.initialize("app-id")
+    storageLevel = StorageLevel.MEMORY_ONLY_SER
+    blockManager = createBlockManager(blockManagerSize, conf)
 
     tempDirectory = Utils.createTempDir()
     manualClock.setTime(0)
   }
 
   after {
-    if (blockManager != null) {
-      blockManager.stop()
-      blockManager = null
+    for ( blockManager <- blockManagerBuffer ) {
+      if (blockManager != null) {
+        blockManager.stop()
+      }
     }
+    blockManager = null
+    blockManagerBuffer.clear()
     if (blockManagerMaster != null) {
       blockManagerMaster.stop()
       blockManagerMaster = null
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
     }
   }
 
+  test("Test Block - count messages") {
+    // Test count with BlockManagedBasedBlockHandler
+    testCountWithBlockManagerBasedBlockHandler(true)
+    // Test count with WriteAheadLogBasedBlockHandler
+    testCountWithBlockManagerBasedBlockHandler(false)
+  }
+
+  test("Test Block - isFullyConsumed") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
+    // spark.storage.unrollFraction set to 0.4 for BlockManager
+    sparkConf.set("spark.storage.unrollFraction", "0.4")
+    // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
+    blockManager = createBlockManager(12000, sparkConf)
+
+    // there is not enough space to store this block in MEMORY,
+    // But BlockManager will be able to sereliaze this block to WAL
+    // and hence count returns correct value.
+     testRecordcount(false, StorageLevel.MEMORY_ONLY,
+      IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+    // there is not enough space to store this block in MEMORY,
+    // But BlockManager will be able to sereliaze this block to DISK
+    // and hence count returns correct value.
+    testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+      IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), 
blockManager, Some(70))
+
+    // there is not enough space to store this block With MEMORY_ONLY 
StorageLevel.
+    // BlockManager will not be able to unroll this block
+    // and hence it will not tryToPut this block, resulting the SparkException
+    storageLevel = StorageLevel.MEMORY_ONLY
+    withBlockManagerBasedBlockHandler { handler =>
+      val thrown = intercept[SparkException] {
+        storeSingleBlock(handler, IteratorBlock((List.fill(70)(new 
Array[Byte](100))).iterator))
+      }
+    }
+  }
+
+  private def 
testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: 
Boolean) {
+    // ByteBufferBlock-MEMORY_ONLY
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
+      ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), 
blockManager, None)
+    // ByteBufferBlock-MEMORY_ONLY_SER
+    testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+      ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), 
blockManager, None)
+    // ArrayBufferBlock-MEMORY_ONLY
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
+      ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+    // ArrayBufferBlock-MEMORY_ONLY_SER
+    testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+      ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+    // ArrayBufferBlock-DISK_ONLY
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY,
+      ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+    // ArrayBufferBlock-MEMORY_AND_DISK
+    testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+      ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+    // IteratorBlock-MEMORY_ONLY
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
+      IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+    // IteratorBlock-MEMORY_ONLY_SER
+    testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_ONLY_SER,
+      IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, 
Some(100))
+    // IteratorBlock-DISK_ONLY
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY,
+      IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, 
Some(125))
+    // IteratorBlock-MEMORY_AND_DISK
+    testRecordcount(isBlockManagerBasedBlockHandler, 
StorageLevel.MEMORY_AND_DISK,
+      IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, 
Some(150))
+  }
+
+  private def createBlockManager(
+      maxMem: Long,
+      conf: SparkConf,
+      name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+    val transfer = new NioBlockTransferService(conf, securityMgr)
+    val manager = new BlockManager(name, rpcEnv, blockManagerMaster, 
serializer, maxMem, conf,
+      mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+    manager.initialize("app-id")
+    blockManagerBuffer += manager
+    manager
+  }
+
+  /**
+   * Test storing of data using different types of Handler, StorageLevle and 
ReceivedBlocks
+   * and verify the correct record count
+   */
+  private def testRecordcount(isBlockManagedBasedBlockHandler: Boolean,
+      sLevel: StorageLevel,
+      receivedBlock: ReceivedBlock,
+      bManager: BlockManager,
+      expectedNumRecords: Option[Long]
+      ) {
+    blockManager = bManager
+    storageLevel = sLevel
+    var bId: StreamBlockId = null
+    try {
+      if (isBlockManagedBasedBlockHandler) {
+        // test received block with BlockManager based handler
+        withBlockManagerBasedBlockHandler { handler =>
+          val (blockId, blockStoreResult) = storeSingleBlock(handler, 
receivedBlock)
+          bId = blockId
+          assert(blockStoreResult.numRecords === expectedNumRecords,
+            "Message count not matches for a " +
+            receivedBlock.getClass.getName +
+            " being inserted using BlockManagerBasedBlockHandler with " + 
sLevel)
+       }
+      } else {
+        // test received block with WAL based handler
+        withWriteAheadLogBasedBlockHandler { handler =>
+          val (blockId, blockStoreResult) = storeSingleBlock(handler, 
receivedBlock)
+          bId = blockId
+          assert(blockStoreResult.numRecords === expectedNumRecords,
+            "Message count not matches for a " +
+            receivedBlock.getClass.getName +
+            " being inserted using WriteAheadLogBasedBlockHandler with " + 
sLevel)
+        }
+      }
+    } finally {
+     // Removing the Block Id to use same blockManager for next test
+     blockManager.removeBlock(bId, true)
+    }
+  }
+
   /**
    * Test storing of data using different forms of ReceivedBlocks and verify 
that they succeeded
    * using the given verification function
@@ -251,9 +377,21 @@ class ReceivedBlockHandlerSuite
     (blockIds, storeResults)
   }
 
+  /** Store single block using a handler */
+  private def storeSingleBlock(
+      handler: ReceivedBlockHandler,
+      block: ReceivedBlock
+    ): (StreamBlockId, ReceivedBlockStoreResult) = {
+    val blockId = generateBlockId
+    val blockStoreResult = handler.storeBlock(blockId, block)
+    logDebug("Done inserting")
+    (blockId, blockStoreResult)
+  }
+
   private def getWriteAheadLogFiles(): Seq[String] = {
     getLogFilesInDirectory(checkpointDirToLogDir(tempDirectory.toString, 
streamId))
   }
 
   private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, 
scala.util.Random.nextLong)
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/3eaed876/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index be305b5..f793a12 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -225,7 +225,7 @@ class ReceivedBlockTrackerSuite
   /** Generate blocks infos using random ids */
   def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
     List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
-      BlockManagerBasedStoreResult(StreamBlockId(streamId, 
math.abs(Random.nextInt)))))
+      BlockManagerBasedStoreResult(StreamBlockId(streamId, 
math.abs(Random.nextInt)), Some(0L))))
   }
 
   /** Get all the data written in the given write ahead log file. */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to