Repository: spark Updated Branches: refs/heads/master c3527a333 -> 77f836799
SPARK-1497. Fix scalastyle warnings in YARN, Hive code (I wasn't sure how to automatically set `SPARK_YARN=true` and `SPARK_HIVE=true` when running scalastyle, but these are the errors that turn up.) Author: Sean Owen <[email protected]> Closes #413 from srowen/SPARK-1497 and squashes the following commits: f0c9318 [Sean Owen] Fix more scalastyle warnings in yarn 80bf4c3 [Sean Owen] Add YARN alpha / YARN profile to scalastyle check 026319c [Sean Owen] Fix scalastyle warnings in YARN, Hive code Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77f83679 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77f83679 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77f83679 Branch: refs/heads/master Commit: 77f836799639ea939a1773cef2f4828b381f5ca2 Parents: c3527a3 Author: Sean Owen <[email protected]> Authored: Wed Apr 16 09:34:59 2014 -0700 Committer: Patrick Wendell <[email protected]> Committed: Wed Apr 16 09:34:59 2014 -0700 ---------------------------------------------------------------------- dev/scalastyle | 4 ++++ .../spark/deploy/yarn/ExecutorLauncher.scala | 21 ++++++++++++-------- .../deploy/yarn/YarnAllocationHandler.scala | 11 +++++----- .../spark/deploy/yarn/ApplicationMaster.scala | 3 ++- .../spark/deploy/yarn/ExecutorLauncher.scala | 8 +++++--- .../deploy/yarn/YarnAllocationHandler.scala | 7 ++++--- 6 files changed, 34 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/77f83679/dev/scalastyle ---------------------------------------------------------------------- diff --git a/dev/scalastyle b/dev/scalastyle index 19955b9..7b572f6 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -18,6 +18,10 @@ # echo -e "q\n" | sbt/sbt clean scalastyle > scalastyle.txt +# Check style with YARN alpha built too +SPARK_YARN=true sbt/sbt yarn/scalastyle >> scalastyle.txt +# Check style with YARN built too +SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt yarn/scalastyle >> scalastyle.txt ERRORS=$(cat scalastyle.txt | grep -e "\<error\>") if test ! -z "$ERRORS"; then echo -e "Scalastyle checks failed at following occurrences:\n$ERRORS" http://git-wip-us.apache.org/repos/asf/spark/blob/77f83679/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala ---------------------------------------------------------------------- diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 7b0e020..21f1457 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -37,7 +37,8 @@ import org.apache.spark.scheduler.SplitInfo class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging { - def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) + def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) @@ -63,7 +64,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp override def preStart() { logInfo("Listen to driver: " + driverUrl) driver = context.actorSelection(driverUrl) - // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events. + // Send a hello message thus the connection is actually established, thus we can + // monitor Lifecycle Events. driver ! "Hello" context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } @@ -104,8 +106,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp // Allocate all containers allocateExecutors() - // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout - // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. + // Launch a progress reporter thread, else app will get killed after expiration + // (def: 10mins) timeout ensure that progress is sent before + // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) // we want to be reasonably responsive without causing too many requests to RM. @@ -163,8 +166,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest]) .asInstanceOf[RegisterApplicationMasterRequest] appMasterRequest.setApplicationAttemptId(appAttemptId) - // Setting this to master host,port - so that the ApplicationReport at client has some sensible info. - // Users can then monitor stderr/stdout on that node if required. + // Setting this to master host,port - so that the ApplicationReport at client has + // some sensible info. Users can then monitor stderr/stdout on that node if required. appMasterRequest.setHost(Utils.localHostName()) appMasterRequest.setRpcPort(0) // What do we provide here ? Might make sense to expose something sensible later ? @@ -213,7 +216,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp // TODO: This is a bit ugly. Can we make it nicer? // TODO: Handle container failure while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { - yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) + yarnAllocator.allocateContainers( + math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) Thread.sleep(100) } @@ -230,7 +234,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp while (!driverClosed) { val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning if (missingExecutorCount > 0) { - logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers") + logInfo("Allocating " + missingExecutorCount + + " containers to make up for (potentially ?) lost containers") yarnAllocator.allocateContainers(missingExecutorCount) } else sendProgress() http://git-wip-us.apache.org/repos/asf/spark/blob/77f83679/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala ---------------------------------------------------------------------- diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 2056667..d6d46a5 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -225,8 +225,8 @@ private[yarn] class YarnAllocationHandler( val executorHostname = container.getNodeId.getHost val containerId = container.getId - assert( - container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + assert( container.getResource.getMemory >= + (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) if (numExecutorsRunningNow > maxExecutors) { logInfo("""Ignoring container %s at host %s, since we already have the required number of @@ -393,9 +393,10 @@ private[yarn] class YarnAllocationHandler( // default. if (numExecutors <= 0 || preferredHostToCount.isEmpty) { - logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty) - resourceRequests = List( - createResourceRequest(AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY)) + logDebug("numExecutors: " + numExecutors + ", host preferences: " + + preferredHostToCount.isEmpty) + resourceRequests = List(createResourceRequest( + AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY)) } else { // request for all hosts in preferred nodes and for numExecutors - http://git-wip-us.apache.org/repos/asf/spark/blob/77f83679/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 61af0f9..581cfe4 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -137,7 +137,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase - System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params) + System.setProperty( + "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params) } /** Get the Yarn approved local directories. */ http://git-wip-us.apache.org/repos/asf/spark/blob/77f83679/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index b697f10..67ed591 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -65,7 +65,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp override def preStart() { logInfo("Listen to driver: " + driverUrl) driver = context.actorSelection(driverUrl) - // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events. + // Send a hello message thus the connection is actually established, + // thus we can monitor Lifecycle Events. driver ! "Hello" context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } @@ -95,8 +96,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp // Allocate all containers allocateExecutors() - // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout - // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. + // Launch a progress reporter thread, else app will get killed after expiration + // (def: 10mins) timeout ensure that progress is sent before + // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) // we want to be reasonably responsive without causing too many requests to RM. http://git-wip-us.apache.org/repos/asf/spark/blob/77f83679/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index e31c406..4fafae1 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -276,7 +276,8 @@ private[yarn] class YarnAllocationHandler( allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1) } } - logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(driverUrl, executorHostname)) + logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format( + driverUrl, executorHostname)) val executorRunnable = new ExecutorRunnable( container, conf, @@ -314,8 +315,8 @@ private[yarn] class YarnAllocationHandler( // `pendingReleaseContainers`. pendingReleaseContainers.remove(containerId) } else { - // Decrement the number of executors running. The next iteration of the ApplicationMaster's - // reporting thread will take care of allocating. + // Decrement the number of executors running. The next iteration of + // the ApplicationMaster's reporting thread will take care of allocating. numExecutorsRunning.decrementAndGet() logInfo("Completed container %s (state: %s, exit status: %s)".format( containerId,
