This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 3b4fd1d0831 [SPARK-42757][CONNECT] Implement textFile for
DataFrameReader
3b4fd1d0831 is described below
commit 3b4fd1d083110a150d83ac249f7e3c7dc1af5e60
Author: panbingkun <[email protected]>
AuthorDate: Wed Mar 15 08:41:27 2023 +0900
[SPARK-42757][CONNECT] Implement textFile for DataFrameReader
### What changes were proposed in this pull request?
The pr aims to implement textFile for DataFrameReader.
### Why are the changes needed?
API coverage.
### Does this PR introduce _any_ user-facing change?
New method.
### How was this patch tested?
Add new UT.
Closes #40377 from panbingkun/connect_textFile.
Authored-by: panbingkun <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 0b8c1482a9504fd3d4ac0245b068072df4ebf427)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../scala/org/apache/spark/sql/DataFrameReader.scala | 17 +++++++++++++----
.../org/apache/spark/sql/ClientE2ETestSuite.scala | 20 ++++++++++++++++++++
2 files changed, 33 insertions(+), 4 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index ad921bcc4e3..193eb4faaab 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -24,8 +24,10 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.Stable
import org.apache.spark.connect.proto.Parse.ParseFormat
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
CharVarcharUtils}
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.StructType
/**
@@ -531,10 +533,8 @@ class DataFrameReader private[sql] (sparkSession:
SparkSession) extends Logging
*/
@scala.annotation.varargs
def textFile(paths: String*): Dataset[String] = {
- // scalastyle:off throwerror
- // TODO: this method can be supported and should be included in the client
API.
- throw new NotImplementedError()
- // scalastyle:on throwerror
+ assertNoSpecifiedSchema("textFile")
+ text(paths: _*).select("value").as(StringEncoder)
}
private def assertSourceFormatSpecified(): Unit = {
@@ -556,6 +556,15 @@ class DataFrameReader private[sql] (sparkSession:
SparkSession) extends Logging
}
}
+ /**
+ * A convenient function for schema validation in APIs.
+ */
+ private def assertNoSpecifiedSchema(operation: String): Unit = {
+ if (userSpecifiedSchema.nonEmpty) {
+ throw
QueryCompilationErrors.userSpecifiedSchemaUnsupportedError(operation)
+ }
+ }
+
///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 5aa5500116d..8665c067326 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -161,6 +161,26 @@ class ClientE2ETestSuite extends RemoteSparkSession with
SQLHelper {
}
}
+ test("textFile") {
+ val testDataPath = java.nio.file.Paths
+ .get(
+ IntegrationTestUtils.sparkHome,
+ "connector",
+ "connect",
+ "common",
+ "src",
+ "test",
+ "resources",
+ "query-tests",
+ "test-data",
+ "people.txt")
+ .toAbsolutePath
+ val result = spark.read.textFile(testDataPath.toString).collect()
+ val expected = Array("Michael, 29", "Andy, 30", "Justin, 19")
+ assert(result.length == 3)
+ assert(result === expected)
+ }
+
test("write table") {
withTable("myTable") {
val df = spark.range(10).limit(3)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]