Repository: spark Updated Branches: refs/heads/master 40e080a68 -> edf8a56ab
Remote BlockFetchTracker trait This trait seems to have been created a while ago when there were multiple implementations; now that there's just one, I think it makes sense to merge it into the BlockFetcherIterator trait. Author: Kay Ousterhout <[email protected]> Closes #39 from kayousterhout/remove_tracker and squashes the following commits: 8173939 [Kay Ousterhout] Remote BlockFetchTracker. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edf8a56a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edf8a56a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edf8a56a Branch: refs/heads/master Commit: edf8a56ab7eaee1f7c3b4579eb10464984d31d7a Parents: 40e080a Author: Kay Ousterhout <[email protected]> Authored: Thu Feb 27 21:52:55 2014 -0800 Committer: Patrick Wendell <[email protected]> Committed: Thu Feb 27 21:52:55 2014 -0800 ---------------------------------------------------------------------- .../spark/storage/BlockFetchTracker.scala | 27 ------------------- .../spark/storage/BlockFetcherIterator.scala | 28 ++++++++++++-------- 2 files changed, 17 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/edf8a56a/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala deleted file mode 100644 index 2e0b0e6..0000000 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -private[spark] trait BlockFetchTracker { - def totalBlocks : Int - def numLocalBlocks: Int - def numRemoteBlocks: Int - def remoteFetchTime : Long - def fetchWaitTime: Long - def remoteBytesRead : Long -} http://git-wip-us.apache.org/repos/asf/spark/blob/edf8a56a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 925022e..fb50b45 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -44,9 +44,14 @@ import org.apache.spark.util.Utils */ private[storage] -trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] - with Logging with BlockFetchTracker { +trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging { def initialize() + def totalBlocks: Int + def numLocalBlocks: Int + def numRemoteBlocks: Int + def remoteFetchTime: Long + def fetchWaitTime: Long + def remoteBytesRead: Long } @@ -233,7 +238,16 @@ object BlockFetcherIterator { logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") } - //an iterator that will read fetched blocks off the queue as they arrive. + override def totalBlocks: Int = numLocal + numRemote + override def numLocalBlocks: Int = numLocal + override def numRemoteBlocks: Int = numRemote + override def remoteFetchTime: Long = _remoteFetchTime + override def fetchWaitTime: Long = _fetchWaitTime + override def remoteBytesRead: Long = _remoteBytesRead + + + // Implementing the Iterator methods with an iterator that reads fetched blocks off the queue + // as they arrive. @volatile protected var resultsGotten = 0 override def hasNext: Boolean = resultsGotten < _numBlocksToFetch @@ -251,14 +265,6 @@ object BlockFetcherIterator { } (result.blockId, if (result.failed) None else Some(result.deserialize())) } - - // Implementing BlockFetchTracker trait. - override def totalBlocks: Int = numLocal + numRemote - override def numLocalBlocks: Int = numLocal - override def numRemoteBlocks: Int = numRemote - override def remoteFetchTime: Long = _remoteFetchTime - override def fetchWaitTime: Long = _fetchWaitTime - override def remoteBytesRead: Long = _remoteBytesRead } // End of BasicBlockFetcherIterator
