Repository: spark Updated Branches: refs/heads/master 84b23453d -> 32be51fba
[SPARK-15323][SPARK-14463][SQL] Fix reading of partitioned format=text datasets https://issues.apache.org/jira/browse/SPARK-15323 I was using partitioned text datasets in Spark 1.6.1 but it broke in Spark 2.0.0. It would be logical if you could also write those, but not entirely sure how to solve this with the new DataSet implementation. Also it doesn't work using `sqlContext.read.text`, since that method returns a `DataSet[String]`. See https://issues.apache.org/jira/browse/SPARK-14463 for that issue. Author: Jurriaan Pruis <[email protected]> Closes #13104 from jurriaan/fix-partitioned-text-reads. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32be51fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32be51fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32be51fb Branch: refs/heads/master Commit: 32be51fba45f5e07a2a3520293c12dc7765a364d Parents: 84b2345 Author: Jurriaan Pruis <[email protected]> Authored: Wed May 18 16:15:09 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Wed May 18 16:15:09 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/DataFrameReader.scala | 3 ++- .../datasources/text/DefaultSource.scala | 14 -------------- .../text-partitioned/year=2014/data.txt | 1 + .../text-partitioned/year=2015/data.txt | 1 + .../execution/datasources/text/TextSuite.scala | 20 ++++++++++++++++++++ 5 files changed, 24 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/32be51fb/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 011aff4..e33fd83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -457,7 +457,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ @scala.annotation.varargs def text(paths: String*): Dataset[String] = { - format("text").load(paths : _*).as[String](sparkSession.implicits.newStringEncoder) + format("text").load(paths : _*).select("value") + .as[String](sparkSession.implicits.newStringEncoder) } /////////////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/spark/blob/32be51fb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index f22c024..f091615 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -83,19 +83,6 @@ class DefaultSource extends FileFormat with DataSourceRegister { } } - override private[sql] def buildReaderWithPartitionValues( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - // Text data source doesn't support partitioning. Here we simply delegate to `buildReader`. - buildReader( - sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf) - } - override def buildReader( sparkSession: SparkSession, dataSchema: StructType, @@ -152,4 +139,3 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp recordWriter.close(context) } } - http://git-wip-us.apache.org/repos/asf/spark/blob/32be51fb/sql/core/src/test/resources/text-partitioned/year=2014/data.txt ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/text-partitioned/year=2014/data.txt b/sql/core/src/test/resources/text-partitioned/year=2014/data.txt new file mode 100644 index 0000000..e271942 --- /dev/null +++ b/sql/core/src/test/resources/text-partitioned/year=2014/data.txt @@ -0,0 +1 @@ +2014-test http://git-wip-us.apache.org/repos/asf/spark/blob/32be51fb/sql/core/src/test/resources/text-partitioned/year=2015/data.txt ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/text-partitioned/year=2015/data.txt b/sql/core/src/test/resources/text-partitioned/year=2015/data.txt new file mode 100644 index 0000000..b8c03da --- /dev/null +++ b/sql/core/src/test/resources/text-partitioned/year=2015/data.txt @@ -0,0 +1 @@ +2015-test http://git-wip-us.apache.org/repos/asf/spark/blob/32be51fb/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index f61fce5..b5e51e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -65,6 +65,26 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + test("reading partitioned data using read.text()") { + val partitionedData = Thread.currentThread().getContextClassLoader + .getResource("text-partitioned").toString + val df = spark.read.text(partitionedData) + val data = df.collect() + + assert(df.schema == new StructType().add("value", StringType)) + assert(data.length == 2) + } + + test("support for partitioned reading") { + val partitionedData = Thread.currentThread().getContextClassLoader + .getResource("text-partitioned").toString + val df = spark.read.format("text").load(partitionedData) + val data = df.filter("year = '2015'").select("value").collect() + + assert(data(0) == Row("2015-test")) + assert(data.length == 1) + } + test("SPARK-13503 Support to specify the option for compression codec for TEXT") { val testDf = spark.read.text(testFile) val extensionNameMap = Map("bzip2" -> ".bz2", "deflate" -> ".deflate", "gzip" -> ".gz") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
