Repository: spark
Updated Branches:
  refs/heads/master 393db655c -> 272a2f78f


[SPARK-15990][YARN] Add rolling log aggregation support for Spark on yarn

## What changes were proposed in this pull request?

Yarn supports rolling log aggregation since 2.6, previously log will only be 
aggregated to HDFS after application is finished, it is quite painful for long 
running applications like Spark Streaming, thriftserver. Also out of disk 
problem will be occurred when log file is too large. So here propose to add 
support of rolling log aggregation for Spark on yarn.

One limitation for this is that log4j should be set to change to file appender, 
now in Spark itself uses console appender by default, in which file will not be 
created again once removed after aggregation. But I think lots of production 
users should have changed their log4j configuration instead of default on, so 
this is not a big problem.

## How was this patch tested?

Manually verified with Hadoop 2.7.1.

Author: jerryshao <ss...@hortonworks.com>

Closes #13712 from jerryshao/SPARK-15990.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/272a2f78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/272a2f78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/272a2f78

Branch: refs/heads/master
Commit: 272a2f78f3ff801b94a81fa8fcc6633190eaa2f4
Parents: 393db65
Author: jerryshao <ss...@hortonworks.com>
Authored: Wed Jun 29 08:17:27 2016 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Wed Jun 29 08:17:27 2016 -0500

----------------------------------------------------------------------
 docs/running-on-yarn.md                         | 24 +++++++++++++++++
 .../org/apache/spark/deploy/yarn/Client.scala   | 27 ++++++++++++++++++++
 .../org/apache/spark/deploy/yarn/config.scala   | 16 ++++++++++++
 3 files changed, 67 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/272a2f78/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index dbd46cc..4e92042 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -472,6 +472,30 @@ To use a custom metrics.properties for the application 
master and executors, upd
   Currently supported services are: <code>hive</code>, <code>hbase</code>
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.rolledLog.includePattern</code></td>
+  <td>(none)</td>
+  <td>
+  Java Regex to filter the log files which match the defined include pattern
+  and those log files will be aggregated in a rolling fashion.
+  This will be used with YARN's rolling log aggregation, to enable this 
feature in YARN side
+  
<code>yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds</code> 
should be
+  configured in yarn-site.xml.
+  This feature can only be used with Hadoop 2.6.1+. The Spark log4j appender 
needs be changed to use
+  FileAppender or another appender that can handle the files being removed 
while its running. Based
+  on the file name configured in the log4j configuration (like spark.log), the 
user should set the
+  regex (spark*) to include all the log files that need to be aggregated.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.yarn.rolledLog.excludePattern</code></td>
+  <td>(none)</td>
+  <td>
+  Java Regex to filter the log files which match the defined exclude pattern
+  and those log files will not be aggregated in a rolling fashion. If the log 
file
+  name matches both the include and the exclude pattern, this file will be 
excluded eventually.
+  </td>
+</tr>
 </table>
 
 # Important notes

http://git-wip-us.apache.org/repos/asf/spark/blob/272a2f78/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 9bb3695..d63579f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -271,6 +271,33 @@ private[spark] class Client(
         appContext.setResource(capability)
     }
 
+    sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern =>
+      try {
+        val logAggregationContext = Records.newRecord(
+          
Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext"))
+          .asInstanceOf[Object]
+
+        val setRolledLogsIncludePatternMethod =
+          
logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", 
classOf[String])
+        setRolledLogsIncludePatternMethod.invoke(logAggregationContext, 
includePattern)
+
+        sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
+          val setRolledLogsExcludePatternMethod =
+            
logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", 
classOf[String])
+          setRolledLogsExcludePatternMethod.invoke(logAggregationContext, 
excludePattern)
+        }
+
+        val setLogAggregationContextMethod =
+          appContext.getClass.getMethod("setLogAggregationContext",
+            
Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext"))
+        setLogAggregationContextMethod.invoke(appContext, 
logAggregationContext)
+      } catch {
+        case NonFatal(e) =>
+          logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the 
version of YARN " +
+            s"does not support it", e)
+      }
+    }
+
     appContext
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/272a2f78/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index ad2412e..49c0177 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -243,6 +243,22 @@ package object config {
     .toSequence
     .createWithDefault(Nil)
 
+  /* Rolled log aggregation configuration. */
+
+  private[spark] val ROLLED_LOG_INCLUDE_PATTERN =
+    ConfigBuilder("spark.yarn.rolledLog.includePattern")
+      .doc("Java Regex to filter the log files which match the defined include 
pattern and those " +
+        "log files will be aggregated in a rolling fashion.")
+      .stringConf
+      .createOptional
+
+  private[spark] val ROLLED_LOG_EXCLUDE_PATTERN =
+    ConfigBuilder("spark.yarn.rolledLog.excludePattern")
+      .doc("Java Regex to filter the log files which match the defined exclude 
pattern and those " +
+        "log files will not be aggregated in a rolling fashion.")
+      .stringConf
+      .createOptional
+
   /* Private configs. */
 
   private[spark] val CREDENTIALS_FILE_PATH = 
ConfigBuilder("spark.yarn.credentials.file")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to