Repository: spark Updated Branches: refs/heads/branch-2.0 4c0af3bbd -> 36acf8856
[SPARK-15323][SPARK-14463][SQL] Fix reading of partitioned format=text datasets https://issues.apache.org/jira/browse/SPARK-15323 I was using partitioned text datasets in Spark 1.6.1 but it broke in Spark 2.0.0. It would be logical if you could also write those, but not entirely sure how to solve this with the new DataSet implementation. Also it doesn't work using `sqlContext.read.text`, since that method returns a `DataSet[String]`. See https://issues.apache.org/jira/browse/SPARK-14463 for that issue. Author: Jurriaan Pruis <[email protected]> Closes #13104 from jurriaan/fix-partitioned-text-reads. (cherry picked from commit 32be51fba45f5e07a2a3520293c12dc7765a364d) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36acf885 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36acf885 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36acf885 Branch: refs/heads/branch-2.0 Commit: 36acf8856c94f93f3b16f4592b6d5fb64acda39d Parents: 4c0af3b Author: Jurriaan Pruis <[email protected]> Authored: Wed May 18 16:15:09 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Wed May 18 16:15:22 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/DataFrameReader.scala | 3 ++- .../datasources/text/DefaultSource.scala | 14 -------------- .../text-partitioned/year=2014/data.txt | 1 + .../text-partitioned/year=2015/data.txt | 1 + .../execution/datasources/text/TextSuite.scala | 20 ++++++++++++++++++++ 5 files changed, 24 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/36acf885/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 011aff4..e33fd83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -457,7 +457,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ @scala.annotation.varargs def text(paths: String*): Dataset[String] = { - format("text").load(paths : _*).as[String](sparkSession.implicits.newStringEncoder) + format("text").load(paths : _*).select("value") + .as[String](sparkSession.implicits.newStringEncoder) } /////////////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/spark/blob/36acf885/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index f22c024..f091615 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -83,19 +83,6 @@ class DefaultSource extends FileFormat with DataSourceRegister { } } - override private[sql] def buildReaderWithPartitionValues( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - // Text data source doesn't support partitioning. Here we simply delegate to `buildReader`. - buildReader( - sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf) - } - override def buildReader( sparkSession: SparkSession, dataSchema: StructType, @@ -152,4 +139,3 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp recordWriter.close(context) } } - http://git-wip-us.apache.org/repos/asf/spark/blob/36acf885/sql/core/src/test/resources/text-partitioned/year=2014/data.txt ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/text-partitioned/year=2014/data.txt b/sql/core/src/test/resources/text-partitioned/year=2014/data.txt new file mode 100644 index 0000000..e271942 --- /dev/null +++ b/sql/core/src/test/resources/text-partitioned/year=2014/data.txt @@ -0,0 +1 @@ +2014-test http://git-wip-us.apache.org/repos/asf/spark/blob/36acf885/sql/core/src/test/resources/text-partitioned/year=2015/data.txt ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/text-partitioned/year=2015/data.txt b/sql/core/src/test/resources/text-partitioned/year=2015/data.txt new file mode 100644 index 0000000..b8c03da --- /dev/null +++ b/sql/core/src/test/resources/text-partitioned/year=2015/data.txt @@ -0,0 +1 @@ +2015-test http://git-wip-us.apache.org/repos/asf/spark/blob/36acf885/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index f61fce5..b5e51e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -65,6 +65,26 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + test("reading partitioned data using read.text()") { + val partitionedData = Thread.currentThread().getContextClassLoader + .getResource("text-partitioned").toString + val df = spark.read.text(partitionedData) + val data = df.collect() + + assert(df.schema == new StructType().add("value", StringType)) + assert(data.length == 2) + } + + test("support for partitioned reading") { + val partitionedData = Thread.currentThread().getContextClassLoader + .getResource("text-partitioned").toString + val df = spark.read.format("text").load(partitionedData) + val data = df.filter("year = '2015'").select("value").collect() + + assert(data(0) == Row("2015-test")) + assert(data.length == 1) + } + test("SPARK-13503 Support to specify the option for compression codec for TEXT") { val testDf = spark.read.text(testFile) val extensionNameMap = Map("bzip2" -> ".bz2", "deflate" -> ".deflate", "gzip" -> ".gz") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
