This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 3b4fd1d0831 [SPARK-42757][CONNECT] Implement textFile for DataFrameReader 3b4fd1d0831 is described below commit 3b4fd1d083110a150d83ac249f7e3c7dc1af5e60 Author: panbingkun <pbk1...@gmail.com> AuthorDate: Wed Mar 15 08:41:27 2023 +0900 [SPARK-42757][CONNECT] Implement textFile for DataFrameReader ### What changes were proposed in this pull request? The pr aims to implement textFile for DataFrameReader. ### Why are the changes needed? API coverage. ### Does this PR introduce _any_ user-facing change? New method. ### How was this patch tested? Add new UT. Closes #40377 from panbingkun/connect_textFile. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 0b8c1482a9504fd3d4ac0245b068072df4ebf427) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../scala/org/apache/spark/sql/DataFrameReader.scala | 17 +++++++++++++---- .../org/apache/spark/sql/ClientE2ETestSuite.scala | 20 ++++++++++++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ad921bcc4e3..193eb4faaab 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -24,8 +24,10 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.Stable import org.apache.spark.connect.proto.Parse.ParseFormat import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils} import org.apache.spark.sql.connect.common.DataTypeProtoConverter +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.StructType /** @@ -531,10 +533,8 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging */ @scala.annotation.varargs def textFile(paths: String*): Dataset[String] = { - // scalastyle:off throwerror - // TODO: this method can be supported and should be included in the client API. - throw new NotImplementedError() - // scalastyle:on throwerror + assertNoSpecifiedSchema("textFile") + text(paths: _*).select("value").as(StringEncoder) } private def assertSourceFormatSpecified(): Unit = { @@ -556,6 +556,15 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging } } + /** + * A convenient function for schema validation in APIs. + */ + private def assertNoSpecifiedSchema(operation: String): Unit = { + if (userSpecifiedSchema.nonEmpty) { + throw QueryCompilationErrors.userSpecifiedSchemaUnsupportedError(operation) + } + } + /////////////////////////////////////////////////////////////////////////////////////// // Builder pattern config options /////////////////////////////////////////////////////////////////////////////////////// diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 5aa5500116d..8665c067326 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -161,6 +161,26 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper { } } + test("textFile") { + val testDataPath = java.nio.file.Paths + .get( + IntegrationTestUtils.sparkHome, + "connector", + "connect", + "common", + "src", + "test", + "resources", + "query-tests", + "test-data", + "people.txt") + .toAbsolutePath + val result = spark.read.textFile(testDataPath.toString).collect() + val expected = Array("Michael, 29", "Andy, 30", "Justin, 19") + assert(result.length == 3) + assert(result === expected) + } + test("write table") { withTable("myTable") { val df = spark.range(10).limit(3) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org