Repository: spark
Updated Branches:
  refs/heads/branch-2.0 2aa25833c -> 26e978a93


[SPARK-17711] Compress rolled executor log

## What changes were proposed in this pull request?

This PR adds support for executor log compression.

## How was this patch tested?

Unit tests

cc: yhuai tdas mengxr

Author: Yu Peng <loneknigh...@gmail.com>

Closes #15285 from loneknightpy/compress-executor-log.

(cherry picked from commit 231f39e3f6641953a90bc4c40444ede63f363b23)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26e978a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26e978a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26e978a9

Branch: refs/heads/branch-2.0
Commit: 26e978a93f029e1a1b5c7524d0b52c8141b70997
Parents: 2aa2583
Author: Yu Peng <loneknigh...@gmail.com>
Authored: Tue Oct 18 13:23:31 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Oct 18 13:23:50 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/worker/ui/LogPage.scala |  7 +-
 .../scala/org/apache/spark/util/Utils.scala     | 80 +++++++++++++++--
 .../util/logging/RollingFileAppender.scala      | 45 ++++++++--
 .../spark/deploy/worker/ui/LogPageSuite.scala   |  6 +-
 .../apache/spark/util/FileAppenderSuite.scala   | 60 ++++++++++++-
 .../org/apache/spark/util/UtilsSuite.scala      | 92 +++++++++++++++-----
 docs/configuration.md                           |  8 ++
 docs/spark-standalone.md                        |  9 ++
 8 files changed, 263 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/26e978a9/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index 3473c41..465c214 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -22,6 +22,8 @@ import javax.servlet.http.HttpServletRequest
 
 import scala.xml.{Node, Unparsed}
 
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.ui.{UIUtils, WebUIPage}
 import org.apache.spark.util.Utils
@@ -138,7 +140,8 @@ private[ui] class LogPage(parent: WorkerWebUI) extends 
WebUIPage("logPage") with
       val files = RollingFileAppender.getSortedRolledOverFiles(logDirectory, 
logType)
       logDebug(s"Sorted log files of type $logType in 
$logDirectory:\n${files.mkString("\n")}")
 
-      val totalLength = files.map { _.length }.sum
+      val fileLengths: Seq[Long] = files.map(Utils.getFileLength(_, 
worker.conf))
+      val totalLength = fileLengths.sum
       val offset = offsetOption.getOrElse(totalLength - byteLength)
       val startIndex = {
         if (offset < 0) {
@@ -151,7 +154,7 @@ private[ui] class LogPage(parent: WorkerWebUI) extends 
WebUIPage("logPage") with
       }
       val endIndex = math.min(startIndex + byteLength, totalLength)
       logDebug(s"Getting log from $startIndex to $endIndex")
-      val logText = Utils.offsetBytes(files, startIndex, endIndex)
+      val logText = Utils.offsetBytes(files, fileLengths, startIndex, endIndex)
       logDebug(s"Got log of length ${logText.length} bytes")
       (logText, startIndex, endIndex, totalLength)
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/26e978a9/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7764fdc..a8532b2 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -27,6 +27,7 @@ import java.nio.file.Files
 import java.util.{Locale, Properties, Random, UUID}
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
+import java.util.zip.GZIPInputStream
 import javax.net.ssl.HttpsURLConnection
 
 import scala.annotation.tailrec
@@ -38,8 +39,10 @@ import scala.reflect.ClassTag
 import scala.util.Try
 import scala.util.control.{ControlThrowable, NonFatal}
 
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import com.google.common.io.{ByteStreams, Files => GFiles}
 import com.google.common.net.InetAddresses
+import org.apache.commons.io.IOUtils
 import org.apache.commons.lang3.SystemUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
@@ -55,6 +58,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{DYN_ALLOCATION_INITIAL_EXECUTORS, 
DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.{DeserializationStream, 
SerializationStream, SerializerInstance}
+import org.apache.spark.util.logging.RollingFileAppender
 
 /** CallSite represents a place in user code. It can have a short and a long 
form. */
 private[spark] case class CallSite(shortForm: String, longForm: String)
@@ -1448,14 +1452,72 @@ private[spark] object Utils extends Logging {
     CallSite(shortForm, longForm)
   }
 
+  private val UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF =
+    "spark.worker.ui.compressedLogFileLengthCacheSize"
+  private val DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE = 100
+  private var compressedLogFileLengthCache: LoadingCache[String, 
java.lang.Long] = null
+  private def getCompressedLogFileLengthCache(
+      sparkConf: SparkConf): LoadingCache[String, java.lang.Long] = 
this.synchronized {
+    if (compressedLogFileLengthCache == null) {
+      val compressedLogFileLengthCacheSize = sparkConf.getInt(
+        UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF,
+        DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE)
+      compressedLogFileLengthCache = CacheBuilder.newBuilder()
+        .maximumSize(compressedLogFileLengthCacheSize)
+        .build[String, java.lang.Long](new CacheLoader[String, 
java.lang.Long]() {
+        override def load(path: String): java.lang.Long = {
+          Utils.getCompressedFileLength(new File(path))
+        }
+      })
+    }
+    compressedLogFileLengthCache
+  }
+
+  /**
+   * Return the file length, if the file is compressed it returns the 
uncompressed file length.
+   * It also caches the uncompressed file size to avoid repeated 
decompression. The cache size is
+   * read from workerConf.
+   */
+  def getFileLength(file: File, workConf: SparkConf): Long = {
+    if (file.getName.endsWith(".gz")) {
+      getCompressedLogFileLengthCache(workConf).get(file.getAbsolutePath)
+    } else {
+      file.length
+    }
+  }
+
+  /** Return uncompressed file length of a compressed file. */
+  private def getCompressedFileLength(file: File): Long = {
+    try {
+      // Uncompress .gz file to determine file size.
+      var fileSize = 0L
+      val gzInputStream = new GZIPInputStream(new FileInputStream(file))
+      val bufSize = 1024
+      val buf = new Array[Byte](bufSize)
+      var numBytes = IOUtils.read(gzInputStream, buf)
+      while (numBytes > 0) {
+        fileSize += numBytes
+        numBytes = IOUtils.read(gzInputStream, buf)
+      }
+      fileSize
+    } catch {
+      case e: Throwable =>
+        logError(s"Cannot get file length of ${file}", e)
+        throw e
+    }
+  }
+
   /** Return a string containing part of a file from byte 'start' to 'end'. */
-  def offsetBytes(path: String, start: Long, end: Long): String = {
+  def offsetBytes(path: String, length: Long, start: Long, end: Long): String 
= {
     val file = new File(path)
-    val length = file.length()
     val effectiveEnd = math.min(length, end)
     val effectiveStart = math.max(0, start)
     val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
-    val stream = new FileInputStream(file)
+    val stream = if (path.endsWith(".gz")) {
+      new GZIPInputStream(new FileInputStream(file))
+    } else {
+      new FileInputStream(file)
+    }
 
     try {
       ByteStreams.skipFully(stream, effectiveStart)
@@ -1471,8 +1533,8 @@ private[spark] object Utils extends Logging {
    * and `endIndex` is based on the cumulative size of all the files take in
    * the given order. See figure below for more details.
    */
-  def offsetBytes(files: Seq[File], start: Long, end: Long): String = {
-    val fileLengths = files.map { _.length }
+  def offsetBytes(files: Seq[File], fileLengths: Seq[Long], start: Long, end: 
Long): String = {
+    assert(files.length == fileLengths.length)
     val startIndex = math.max(start, 0)
     val endIndex = math.min(end, fileLengths.sum)
     val fileToLength = files.zip(fileLengths).toMap
@@ -1480,7 +1542,7 @@ private[spark] object Utils extends Logging {
 
     val stringBuffer = new StringBuffer((endIndex - startIndex).toInt)
     var sum = 0L
-    for (file <- files) {
+    files.zip(fileLengths).foreach { case (file, fileLength) =>
       val startIndexOfFile = sum
       val endIndexOfFile = sum + fileToLength(file)
       logDebug(s"Processing file $file, " +
@@ -1499,19 +1561,19 @@ private[spark] object Utils extends Logging {
 
       if (startIndex <= startIndexOfFile  && endIndex >= endIndexOfFile) {
         // Case C: read the whole file
-        stringBuffer.append(offsetBytes(file.getAbsolutePath, 0, 
fileToLength(file)))
+        stringBuffer.append(offsetBytes(file.getAbsolutePath, fileLength, 0, 
fileToLength(file)))
       } else if (startIndex > startIndexOfFile && startIndex < endIndexOfFile) 
{
         // Case A and B: read from [start of required range] to [end of file / 
end of range]
         val effectiveStartIndex = startIndex - startIndexOfFile
         val effectiveEndIndex = math.min(endIndex - startIndexOfFile, 
fileToLength(file))
         stringBuffer.append(Utils.offsetBytes(
-          file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
+          file.getAbsolutePath, fileLength, effectiveStartIndex, 
effectiveEndIndex))
       } else if (endIndex > startIndexOfFile && endIndex < endIndexOfFile) {
         // Case D: read from [start of file] to [end of require range]
         val effectiveStartIndex = math.max(startIndex - startIndexOfFile, 0)
         val effectiveEndIndex = endIndex - startIndexOfFile
         stringBuffer.append(Utils.offsetBytes(
-          file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
+          file.getAbsolutePath, fileLength, effectiveStartIndex, 
effectiveEndIndex))
       }
       sum += fileToLength(file)
       logDebug(s"After processing file $file, string built is 
${stringBuffer.toString}")

http://git-wip-us.apache.org/repos/asf/spark/blob/26e978a9/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala 
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index a0eb05c..5d8cec8 100644
--- 
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.util.logging
 
-import java.io.{File, FileFilter, InputStream}
+import java.io._
+import java.util.zip.GZIPOutputStream
 
 import com.google.common.io.Files
+import org.apache.commons.io.IOUtils
 
 import org.apache.spark.SparkConf
 
@@ -45,6 +47,7 @@ private[spark] class RollingFileAppender(
   import RollingFileAppender._
 
   private val maxRetainedFiles = conf.getInt(RETAINED_FILES_PROPERTY, -1)
+  private val enableCompression = conf.getBoolean(ENABLE_COMPRESSION, false)
 
   /** Stop the appender */
   override def stop() {
@@ -76,6 +79,33 @@ private[spark] class RollingFileAppender(
     }
   }
 
+  // Roll the log file and compress if enableCompression is true.
+  private def rotateFile(activeFile: File, rolloverFile: File): Unit = {
+    if (enableCompression) {
+      val gzFile = new File(rolloverFile.getAbsolutePath + GZIP_LOG_SUFFIX)
+      var gzOutputStream: GZIPOutputStream = null
+      var inputStream: InputStream = null
+      try {
+        inputStream = new FileInputStream(activeFile)
+        gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile))
+        IOUtils.copy(inputStream, gzOutputStream)
+        inputStream.close()
+        gzOutputStream.close()
+        activeFile.delete()
+      } finally {
+        IOUtils.closeQuietly(inputStream)
+        IOUtils.closeQuietly(gzOutputStream)
+      }
+    } else {
+      Files.move(activeFile, rolloverFile)
+    }
+  }
+
+  // Check if the rollover file already exists.
+  private def rolloverFileExist(file: File): Boolean = {
+    file.exists || new File(file.getAbsolutePath + GZIP_LOG_SUFFIX).exists
+  }
+
   /** Move the active log file to a new rollover file */
   private def moveFile() {
     val rolloverSuffix = rollingPolicy.generateRolledOverFileSuffix()
@@ -83,8 +113,8 @@ private[spark] class RollingFileAppender(
       activeFile.getParentFile, activeFile.getName + 
rolloverSuffix).getAbsoluteFile
     logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
     if (activeFile.exists) {
-      if (!rolloverFile.exists) {
-        Files.move(activeFile, rolloverFile)
+      if (!rolloverFileExist(rolloverFile)) {
+        rotateFile(activeFile, rolloverFile)
         logInfo(s"Rolled over $activeFile to $rolloverFile")
       } else {
         // In case the rollover file name clashes, make a unique file name.
@@ -97,11 +127,11 @@ private[spark] class RollingFileAppender(
           altRolloverFile = new File(activeFile.getParent,
             s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
           i += 1
-        } while (i < 10000 && altRolloverFile.exists)
+        } while (i < 10000 && rolloverFileExist(altRolloverFile))
 
         logWarning(s"Rollover file $rolloverFile already exists, " +
           s"rolled over $activeFile to file $altRolloverFile")
-        Files.move(activeFile, altRolloverFile)
+        rotateFile(activeFile, altRolloverFile)
       }
     } else {
       logWarning(s"File $activeFile does not exist")
@@ -142,6 +172,9 @@ private[spark] object RollingFileAppender {
   val SIZE_DEFAULT = (1024 * 1024).toString
   val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles"
   val DEFAULT_BUFFER_SIZE = 8192
+  val ENABLE_COMPRESSION = "spark.executor.logs.rolling.enableCompression"
+
+  val GZIP_LOG_SUFFIX = ".gz"
 
   /**
    * Get the sorted list of rolled over files. This assumes that the all the 
rolled
@@ -158,6 +191,6 @@ private[spark] object RollingFileAppender {
       val file = new File(directory, activeFileName).getAbsoluteFile
       if (file.exists) Some(file) else None
     }
-    rolledOverFiles ++ activeFile
+    rolledOverFiles.sortBy(_.getName.stripSuffix(GZIP_LOG_SUFFIX)) ++ 
activeFile
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/26e978a9/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
index 72eaffb..4c3e967 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
@@ -22,16 +22,20 @@ import java.io.{File, FileWriter}
 import org.mockito.Mockito.{mock, when}
 import org.scalatest.PrivateMethodTester
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.worker.Worker
 
 class LogPageSuite extends SparkFunSuite with PrivateMethodTester {
 
   test("get logs simple") {
     val webui = mock(classOf[WorkerWebUI])
+    val worker = mock(classOf[Worker])
     val tmpDir = new File(sys.props("java.io.tmpdir"))
     val workDir = new File(tmpDir, "work-dir")
     workDir.mkdir()
     when(webui.workDir).thenReturn(workDir)
+    when(webui.worker).thenReturn(worker)
+    when(worker.conf).thenReturn(new SparkConf())
     val logPage = new LogPage(webui)
 
     // Prepare some fake log files to read later

http://git-wip-us.apache.org/repos/asf/spark/blob/26e978a9/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala 
b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 4fa9f9a..7e2da8e 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -20,11 +20,13 @@ package org.apache.spark.util
 import java.io._
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.CountDownLatch
+import java.util.zip.GZIPInputStream
 
 import scala.collection.mutable.HashSet
 import scala.reflect._
 
 import com.google.common.io.Files
+import org.apache.commons.io.IOUtils
 import org.apache.log4j.{Appender, Level, Logger}
 import org.apache.log4j.spi.LoggingEvent
 import org.mockito.ArgumentCaptor
@@ -72,6 +74,25 @@ class FileAppenderSuite extends SparkFunSuite with 
BeforeAndAfter with Logging {
     testRolling(appender, testOutputStream, textToAppend, 
rolloverIntervalMillis)
   }
 
+  test("rolling file appender - time-based rolling (compressed)") {
+    // setup input stream and appender
+    val testOutputStream = new PipedOutputStream()
+    val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+    val rolloverIntervalMillis = 100
+    val durationMillis = 1000
+    val numRollovers = durationMillis / rolloverIntervalMillis
+    val textToAppend = (1 to numRollovers).map( _.toString * 10 )
+
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.executor.logs.rolling.enableCompression", "true")
+    val appender = new RollingFileAppender(testInputStream, testFile,
+      new TimeBasedRollingPolicy(rolloverIntervalMillis, s"--HH-mm-ss-SSSS", 
false),
+      sparkConf, 10)
+
+    testRolling(
+      appender, testOutputStream, textToAppend, rolloverIntervalMillis, 
isCompressed = true)
+  }
+
   test("rolling file appender - size-based rolling") {
     // setup input stream and appender
     val testOutputStream = new PipedOutputStream()
@@ -89,6 +110,25 @@ class FileAppenderSuite extends SparkFunSuite with 
BeforeAndAfter with Logging {
     }
   }
 
+  test("rolling file appender - size-based rolling (compressed)") {
+    // setup input stream and appender
+    val testOutputStream = new PipedOutputStream()
+    val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+    val rolloverSize = 1000
+    val textToAppend = (1 to 3).map( _.toString * 1000 )
+
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.executor.logs.rolling.enableCompression", "true")
+    val appender = new RollingFileAppender(testInputStream, testFile,
+      new SizeBasedRollingPolicy(rolloverSize, false), sparkConf, 99)
+
+    val files = testRolling(appender, testOutputStream, textToAppend, 0, 
isCompressed = true)
+    files.foreach { file =>
+      logInfo(file.toString + ": " + file.length + " bytes")
+      assert(file.length < rolloverSize)
+    }
+  }
+
   test("rolling file appender - cleaning") {
     // setup input stream and appender
     val testOutputStream = new PipedOutputStream()
@@ -273,7 +313,8 @@ class FileAppenderSuite extends SparkFunSuite with 
BeforeAndAfter with Logging {
       appender: FileAppender,
       outputStream: OutputStream,
       textToAppend: Seq[String],
-      sleepTimeBetweenTexts: Long
+      sleepTimeBetweenTexts: Long,
+      isCompressed: Boolean = false
     ): Seq[File] = {
     // send data to appender through the input stream, and wait for the data 
to be written
     val expectedText = textToAppend.mkString("")
@@ -290,10 +331,23 @@ class FileAppenderSuite extends SparkFunSuite with 
BeforeAndAfter with Logging {
     // verify whether all the data written to rolled over files is same as 
expected
     val generatedFiles = RollingFileAppender.getSortedRolledOverFiles(
       testFile.getParentFile.toString, testFile.getName)
-    logInfo("Filtered files: \n" + generatedFiles.mkString("\n"))
+    logInfo("Generate files: \n" + generatedFiles.mkString("\n"))
     assert(generatedFiles.size > 1)
+    if (isCompressed) {
+      assert(
+        
generatedFiles.filter(_.getName.endsWith(RollingFileAppender.GZIP_LOG_SUFFIX)).size
 > 0)
+    }
     val allText = generatedFiles.map { file =>
-      Files.toString(file, StandardCharsets.UTF_8)
+      if (file.getName.endsWith(RollingFileAppender.GZIP_LOG_SUFFIX)) {
+        val inputStream = new GZIPInputStream(new FileInputStream(file))
+        try {
+          IOUtils.toString(inputStream, StandardCharsets.UTF_8)
+        } finally {
+          IOUtils.closeQuietly(inputStream)
+        }
+      } else {
+        Files.toString(file, StandardCharsets.UTF_8)
+      }
     }.mkString("")
     assert(allText === expectedText)
     generatedFiles

http://git-wip-us.apache.org/repos/asf/spark/blob/26e978a9/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 4715fd2..2741ad7 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -25,11 +25,13 @@ import java.nio.charset.StandardCharsets
 import java.text.DecimalFormatSymbols
 import java.util.Locale
 import java.util.concurrent.TimeUnit
+import java.util.zip.GZIPOutputStream
 
 import scala.collection.mutable.ListBuffer
 import scala.util.Random
 
 import com.google.common.io.Files
+import org.apache.commons.io.IOUtils
 import org.apache.commons.lang3.SystemUtils
 import org.apache.commons.math3.stat.inference.ChiSquareTest
 import org.apache.hadoop.conf.Configuration
@@ -274,65 +276,109 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
     assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11" + sep + 
"00 h")
   }
 
-  test("reading offset bytes of a file") {
+  def getSuffix(isCompressed: Boolean): String = {
+    if (isCompressed) {
+      ".gz"
+    } else {
+      ""
+    }
+  }
+
+  def writeLogFile(path: String, content: Array[Byte]): Unit = {
+    val outputStream = if (path.endsWith(".gz")) {
+      new GZIPOutputStream(new FileOutputStream(path))
+    } else {
+      new FileOutputStream(path)
+    }
+    IOUtils.write(content, outputStream)
+    outputStream.close()
+    content.size
+  }
+
+  private val workerConf = new SparkConf()
+
+  def testOffsetBytes(isCompressed: Boolean): Unit = {
     val tmpDir2 = Utils.createTempDir()
-    val f1Path = tmpDir2 + "/f1"
-    val f1 = new FileOutputStream(f1Path)
-    f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8))
-    f1.close()
+    val suffix = getSuffix(isCompressed)
+    val f1Path = tmpDir2 + "/f1" + suffix
+    writeLogFile(f1Path, 
"1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8))
+    val f1Length = Utils.getFileLength(new File(f1Path), workerConf)
 
     // Read first few bytes
-    assert(Utils.offsetBytes(f1Path, 0, 5) === "1\n2\n3")
+    assert(Utils.offsetBytes(f1Path, f1Length, 0, 5) === "1\n2\n3")
 
     // Read some middle bytes
-    assert(Utils.offsetBytes(f1Path, 4, 11) === "3\n4\n5\n6")
+    assert(Utils.offsetBytes(f1Path, f1Length, 4, 11) === "3\n4\n5\n6")
 
     // Read last few bytes
-    assert(Utils.offsetBytes(f1Path, 12, 18) === "7\n8\n9\n")
+    assert(Utils.offsetBytes(f1Path, f1Length, 12, 18) === "7\n8\n9\n")
 
     // Read some nonexistent bytes in the beginning
-    assert(Utils.offsetBytes(f1Path, -5, 5) === "1\n2\n3")
+    assert(Utils.offsetBytes(f1Path, f1Length, -5, 5) === "1\n2\n3")
 
     // Read some nonexistent bytes at the end
-    assert(Utils.offsetBytes(f1Path, 12, 22) === "7\n8\n9\n")
+    assert(Utils.offsetBytes(f1Path, f1Length, 12, 22) === "7\n8\n9\n")
 
     // Read some nonexistent bytes on both ends
-    assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
+    assert(Utils.offsetBytes(f1Path, f1Length, -3, 25) === 
"1\n2\n3\n4\n5\n6\n7\n8\n9\n")
 
     Utils.deleteRecursively(tmpDir2)
   }
 
-  test("reading offset bytes across multiple files") {
+  test("reading offset bytes of a file") {
+    testOffsetBytes(isCompressed = false)
+  }
+
+  test("reading offset bytes of a file (compressed)") {
+    testOffsetBytes(isCompressed = true)
+  }
+
+  def testOffsetBytesMultipleFiles(isCompressed: Boolean): Unit = {
     val tmpDir = Utils.createTempDir()
-    val files = (1 to 3).map(i => new File(tmpDir, i.toString))
-    Files.write("0123456789", files(0), StandardCharsets.UTF_8)
-    Files.write("abcdefghij", files(1), StandardCharsets.UTF_8)
-    Files.write("ABCDEFGHIJ", files(2), StandardCharsets.UTF_8)
+    val suffix = getSuffix(isCompressed)
+    val files = (1 to 3).map(i => new File(tmpDir, i.toString + suffix)) :+ 
new File(tmpDir, "4")
+    writeLogFile(files(0).getAbsolutePath, 
"0123456789".getBytes(StandardCharsets.UTF_8))
+    writeLogFile(files(1).getAbsolutePath, 
"abcdefghij".getBytes(StandardCharsets.UTF_8))
+    writeLogFile(files(2).getAbsolutePath, 
"ABCDEFGHIJ".getBytes(StandardCharsets.UTF_8))
+    writeLogFile(files(3).getAbsolutePath, 
"9876543210".getBytes(StandardCharsets.UTF_8))
+    val fileLengths = files.map(Utils.getFileLength(_, workerConf))
 
     // Read first few bytes in the 1st file
-    assert(Utils.offsetBytes(files, 0, 5) === "01234")
+    assert(Utils.offsetBytes(files, fileLengths, 0, 5) === "01234")
 
     // Read bytes within the 1st file
-    assert(Utils.offsetBytes(files, 5, 8) === "567")
+    assert(Utils.offsetBytes(files, fileLengths, 5, 8) === "567")
 
     // Read bytes across 1st and 2nd file
-    assert(Utils.offsetBytes(files, 8, 18) === "89abcdefgh")
+    assert(Utils.offsetBytes(files, fileLengths, 8, 18) === "89abcdefgh")
 
     // Read bytes across 1st, 2nd and 3rd file
-    assert(Utils.offsetBytes(files, 5, 24) === "56789abcdefghijABCD")
+    assert(Utils.offsetBytes(files, fileLengths, 5, 24) === 
"56789abcdefghijABCD")
+
+    // Read bytes across 3rd and 4th file
+    assert(Utils.offsetBytes(files, fileLengths, 25, 35) === "FGHIJ98765")
 
     // Read some nonexistent bytes in the beginning
-    assert(Utils.offsetBytes(files, -5, 18) === "0123456789abcdefgh")
+    assert(Utils.offsetBytes(files, fileLengths, -5, 18) === 
"0123456789abcdefgh")
 
     // Read some nonexistent bytes at the end
-    assert(Utils.offsetBytes(files, 18, 35) === "ijABCDEFGHIJ")
+    assert(Utils.offsetBytes(files, fileLengths, 18, 45) === 
"ijABCDEFGHIJ9876543210")
 
     // Read some nonexistent bytes on both ends
-    assert(Utils.offsetBytes(files, -5, 35) === 
"0123456789abcdefghijABCDEFGHIJ")
+    assert(Utils.offsetBytes(files, fileLengths, -5, 45) ===
+      "0123456789abcdefghijABCDEFGHIJ9876543210")
 
     Utils.deleteRecursively(tmpDir)
   }
 
+  test("reading offset bytes across multiple files") {
+    testOffsetBytesMultipleFiles(isCompressed = false)
+  }
+
+  test("reading offset bytes across multiple files (compressed)") {
+    testOffsetBytesMultipleFiles(isCompressed = true)
+  }
+
   test("deserialize long value") {
     val testval : Long = 9730889947L
     val bbuf = ByteBuffer.allocate(8)

http://git-wip-us.apache.org/repos/asf/spark/blob/26e978a9/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index db088dd..d9eddf9 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -294,6 +294,14 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.executor.logs.rolling.enableCompression</code></td>
+  <td>false</td>
+  <td>
+    Enable executor log compression. If it is enabled, the rolled executor 
logs will be compressed.
+    Disabled by default.
+  </td>
+</tr>
+<tr>
   <td><code>spark.executor.logs.rolling.maxSize</code></td>
   <td>(none)</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/26e978a9/docs/spark-standalone.md
----------------------------------------------------------------------
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 6f0f665..9915487 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -250,6 +250,15 @@ SPARK_WORKER_OPTS supports the following system properties:
     especially if you run jobs very frequently.
   </td>
 </tr>
+<tr>
+  <td><code>spark.worker.ui.compressedLogFileLengthCacheSize</code></td>
+  <td>100</td>
+  <td>
+    For compressed log files, the uncompressed file can only be computed by 
uncompressing the files.
+    Spark caches the uncompressed file size of compressed log files. This 
property controls the cache
+    size.
+  </td>
+</tr>
 </table>
 
 # Connecting an Application to the Cluster


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to