This is an automated email from the ASF dual-hosted git repository.

meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 618d6bf  [SPARK-27588] Binary file data source fails fast and doesn't 
attempt to read very large files
618d6bf is described below

commit 618d6bff71073c8c93501ab7392c3cc579730f0b
Author: Xiangrui Meng <m...@databricks.com>
AuthorDate: Mon Apr 29 16:24:49 2019 -0700

    [SPARK-27588] Binary file data source fails fast and doesn't attempt to 
read very large files
    
    ## What changes were proposed in this pull request?
    
    If a file is too big (>2GB), we should fail fast and do not try to read the 
file.
    
    ## How was this patch tested?
    
    (Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.
    
    Closes #24483 from mengxr/SPARK-27588.
    
    Authored-by: Xiangrui Meng <m...@databricks.com>
    Signed-off-by: Xiangrui Meng <m...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  8 ++++++
 .../datasources/binaryfile/BinaryFileFormat.scala  |  8 ++++++
 .../binaryfile/BinaryFileFormatSuite.scala         | 31 +++++++++++++++++++++-
 3 files changed, 46 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 96d3f5c..87bce1f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1744,6 +1744,14 @@ object SQLConf {
          "and from_utc_timestamp() functions.")
     .booleanConf
     .createWithDefault(false)
+
+  val SOURCES_BINARY_FILE_MAX_LENGTH = 
buildConf("spark.sql.sources.binaryFile.maxLength")
+    .doc("The max length of a file that can be read by the binary file data 
source. " +
+      "Spark will fail fast and not attempt to read the file if its length 
exceeds this value. " +
+      "The theoretical max is Int.MaxValue, though VMs might implement a 
smaller max.")
+    .internal()
+    .intConf
+    .createWithDefault(Int.MaxValue)
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
index db93268..2637784 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
@@ -24,11 +24,13 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, GlobFilter, Path}
 import org.apache.hadoop.mapreduce.Job
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.internal.SQLConf.SOURCES_BINARY_FILE_MAX_LENGTH
 import org.apache.spark.sql.sources.{And, DataSourceRegister, EqualTo, Filter, 
GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Not, Or}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -99,6 +101,7 @@ class BinaryFileFormat extends FileFormat with 
DataSourceRegister {
     val binaryFileSourceOptions = new BinaryFileSourceOptions(options)
     val pathGlobPattern = binaryFileSourceOptions.pathGlobFilter
     val filterFuncs = filters.map(filter => createFilterFunction(filter))
+    val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH)
 
     file: PartitionedFile => {
       val path = new Path(file.filePath)
@@ -115,6 +118,11 @@ class BinaryFileFormat extends FileFormat with 
DataSourceRegister {
             case (MODIFICATION_TIME, i) =>
               writer.write(i, 
DateTimeUtils.fromMillis(status.getModificationTime))
             case (CONTENT, i) =>
+              if (status.getLen > maxLength) {
+                throw new SparkException(
+                  s"The length of ${status.getPath} is ${status.getLen}, " +
+                    s"which exceeds the max length allowed: ${maxLength}.")
+              }
               val stream = fs.open(status.getPath)
               try {
                 writer.write(i, ByteStreams.toByteArray(stream))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index fb83c3c..01dc96c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -27,10 +27,12 @@ import com.google.common.io.{ByteStreams, Closeables}
 import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}
 import org.mockito.Mockito.{mock, when}
 
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf.SOURCES_BINARY_FILE_MAX_LENGTH
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 import org.apache.spark.sql.types._
@@ -339,4 +341,31 @@ class BinaryFileFormatSuite extends QueryTest with 
SharedSQLContext with SQLTest
     assert(df.select("LENGTH").first().getLong(0) === content.length,
       "column pruning should be case insensitive")
   }
+
+  test("fail fast and do not attempt to read if a file is too big") {
+    assert(spark.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) === Int.MaxValue)
+    withTempPath { file =>
+      val path = file.getPath
+      val content = "123".getBytes
+      Files.write(file.toPath, content, StandardOpenOption.CREATE, 
StandardOpenOption.WRITE)
+      def readContent(): DataFrame = {
+        spark.read.format(BINARY_FILE)
+          .load(path)
+          .select(CONTENT)
+      }
+      val expected = Seq(Row(content))
+      QueryTest.checkAnswer(readContent(), expected)
+      withSQLConf(SOURCES_BINARY_FILE_MAX_LENGTH.key -> 
content.length.toString) {
+        QueryTest.checkAnswer(readContent(), expected)
+      }
+      // Disable read. If the implementation attempts to read, the exception 
would be different.
+      file.setReadable(false)
+      val caught = intercept[SparkException] {
+        withSQLConf(SOURCES_BINARY_FILE_MAX_LENGTH.key -> (content.length - 
1).toString) {
+          QueryTest.checkAnswer(readContent(), expected)
+        }
+      }
+      assert(caught.getMessage.contains("exceeds the max length allowed"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to