This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 3b4fd1d0831 [SPARK-42757][CONNECT] Implement textFile for 
DataFrameReader
3b4fd1d0831 is described below

commit 3b4fd1d083110a150d83ac249f7e3c7dc1af5e60
Author: panbingkun <pbk1...@gmail.com>
AuthorDate: Wed Mar 15 08:41:27 2023 +0900

    [SPARK-42757][CONNECT] Implement textFile for DataFrameReader
    
    ### What changes were proposed in this pull request?
    The pr aims to implement textFile for DataFrameReader.
    
    ### Why are the changes needed?
    API coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    New method.
    
    ### How was this patch tested?
    Add new UT.
    
    Closes #40377 from panbingkun/connect_textFile.
    
    Authored-by: panbingkun <pbk1...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 0b8c1482a9504fd3d4ac0245b068072df4ebf427)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../scala/org/apache/spark/sql/DataFrameReader.scala | 17 +++++++++++++----
 .../org/apache/spark/sql/ClientE2ETestSuite.scala    | 20 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 4 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index ad921bcc4e3..193eb4faaab 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -24,8 +24,10 @@ import scala.collection.JavaConverters._
 import org.apache.spark.annotation.Stable
 import org.apache.spark.connect.proto.Parse.ParseFormat
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CharVarcharUtils}
 import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -531,10 +533,8 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
    */
   @scala.annotation.varargs
   def textFile(paths: String*): Dataset[String] = {
-    // scalastyle:off throwerror
-    // TODO: this method can be supported and should be included in the client 
API.
-    throw new NotImplementedError()
-    // scalastyle:on throwerror
+    assertNoSpecifiedSchema("textFile")
+    text(paths: _*).select("value").as(StringEncoder)
   }
 
   private def assertSourceFormatSpecified(): Unit = {
@@ -556,6 +556,15 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
     }
   }
 
+  /**
+   * A convenient function for schema validation in APIs.
+   */
+  private def assertNoSpecifiedSchema(operation: String): Unit = {
+    if (userSpecifiedSchema.nonEmpty) {
+      throw 
QueryCompilationErrors.userSpecifiedSchemaUnsupportedError(operation)
+    }
+  }
+
   
///////////////////////////////////////////////////////////////////////////////////////
   // Builder pattern config options
   
///////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 5aa5500116d..8665c067326 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -161,6 +161,26 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper {
     }
   }
 
+  test("textFile") {
+    val testDataPath = java.nio.file.Paths
+      .get(
+        IntegrationTestUtils.sparkHome,
+        "connector",
+        "connect",
+        "common",
+        "src",
+        "test",
+        "resources",
+        "query-tests",
+        "test-data",
+        "people.txt")
+      .toAbsolutePath
+    val result = spark.read.textFile(testDataPath.toString).collect()
+    val expected = Array("Michael, 29", "Andy, 30", "Justin, 19")
+    assert(result.length == 3)
+    assert(result === expected)
+  }
+
   test("write table") {
     withTable("myTable") {
       val df = spark.range(10).limit(3)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to