This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fcf340a1de3 [SPARK-45912][SQL] Enhancement of XSDToSchema API: Change
to HDFS API for cloud storage accessibility
fcf340a1de3 is described below
commit fcf340a1de371ce1beb2cf93473ea2f2b793801b
Author: Shujing Yang <[email protected]>
AuthorDate: Fri Nov 17 09:17:57 2023 +0900
[SPARK-45912][SQL] Enhancement of XSDToSchema API: Change to HDFS API for
cloud storage accessibility
### What changes were proposed in this pull request?
Previously, it utilized `java.nio.path`, which limited file reading to
local file systems only. By changing this to an HDFS-compatible API, we now
enable the XSDToSchema function to access files in cloud storage.
### Why are the changes needed?
We want to enable the XSDToSchema function to access files in cloud storage.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43789 from shujingyang-db/xsd_api.
Authored-by: Shujing Yang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../spark/sql/catalyst/xml/ValidatorUtil.scala | 36 ++++++++++++-------
.../execution/datasources/xml/XSDToSchema.scala | 35 +++++++-----------
.../datasources/xml/util/XSDToSchemaSuite.scala | 41 +++++++++++-----------
3 files changed, 55 insertions(+), 57 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala
index f8b546332c2..0d85a512d7e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.catalyst.xml
+import java.io.{File, FileInputStream, InputStream}
import javax.xml.XMLConstants
import javax.xml.transform.stream.StreamSource
import javax.xml.validation.{Schema, SchemaFactory}
@@ -25,28 +26,18 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkFiles
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.Utils
+import org.apache.spark.internal.Logging
/**
* Utilities for working with XSD validation.
*/
-private[sql] object ValidatorUtil {
+private[sql] object ValidatorUtil extends Logging{
// Parsing XSDs may be slow, so cache them by path:
private val cache = CacheBuilder.newBuilder().softValues().build(
new CacheLoader[String, Schema] {
override def load(key: String): Schema = {
- val in = try {
- // Handle case where file exists as specified
- val fs = Utils.getHadoopFileSystem(key, SparkHadoopUtil.get.conf)
- fs.open(new Path(key))
- } catch {
- case _: Throwable =>
- // Handle case where it was added with sc.addFile
- val addFileUrl = SparkFiles.get(key)
- val fs = Utils.getHadoopFileSystem(addFileUrl,
SparkHadoopUtil.get.conf)
- fs.open(new Path(addFileUrl))
- }
+ val in = openSchemaFile(new Path(key))
try {
val schemaFactory =
SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI)
schemaFactory.newSchema(new StreamSource(in))
@@ -56,6 +47,25 @@ private[sql] object ValidatorUtil {
}
})
+ def openSchemaFile(xsdPath: Path): InputStream = {
+ try {
+ // Handle case where file exists as specified
+ val fs = xsdPath.getFileSystem(SparkHadoopUtil.get.conf)
+ fs.open(xsdPath)
+ } catch {
+ case e: Throwable =>
+ // Handle case where it was added with sc.addFile
+ // When they are added via sc.addFile, they are always downloaded to
local file system
+ logInfo(s"$xsdPath was not found, falling back to look up files added
by Spark")
+ val f = new File(SparkFiles.get(xsdPath.toString))
+ if (f.exists()) {
+ new FileInputStream(f)
+ } else {
+ throw e
+ }
+ }
+ }
+
/**
* Parses the XSD at the given local path and caches it.
*
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala
index b0894ed3484..356ffd57698 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala
@@ -16,53 +16,42 @@
*/
package org.apache.spark.sql.execution.datasources.xml
-import java.io.{File, FileInputStream, InputStreamReader, StringReader}
-import java.nio.charset.StandardCharsets
-import java.nio.file.Path
+import java.io.StringReader
import scala.jdk.CollectionConverters._
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.shaded.org.jline.utils.InputStreamReader
import org.apache.ws.commons.schema._
import org.apache.ws.commons.schema.constants.Constants
-import org.apache.spark.sql.catalyst.xml.XmlOptions
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.xml.{ValidatorUtil, XmlOptions}
import org.apache.spark.sql.types._
/**
* Utility to generate a Spark schema from an XSD. Not all XSD schemas are
simple tabular schemas,
* so not all elements or XSDs are supported.
*/
-object XSDToSchema {
+object XSDToSchema extends Logging{
/**
- * Reads a schema from an XSD file.
+ * Reads a schema from an XSD path.
* Note that if the schema consists of one complex parent type which you
want to use as
* the row tag schema, then you will need to extract the schema of the
single resulting
* struct in the resulting StructType, and use its StructType as your schema.
*
- * @param xsdFile XSD file
+ * @param xsdPath XSD path
* @return Spark-compatible schema
*/
- def read(xsdFile: File): StructType = {
+ def read(xsdPath: Path): StructType = {
+ val in = ValidatorUtil.openSchemaFile(xsdPath)
val xmlSchemaCollection = new XmlSchemaCollection()
- xmlSchemaCollection.setBaseUri(xsdFile.getParent)
- val xmlSchema = xmlSchemaCollection.read(
- new InputStreamReader(new FileInputStream(xsdFile),
StandardCharsets.UTF_8))
-
+ xmlSchemaCollection.setBaseUri(xsdPath.getParent.toString)
+ val xmlSchema = xmlSchemaCollection.read(new InputStreamReader(in))
getStructType(xmlSchema)
}
- /**
- * Reads a schema from an XSD file.
- * Note that if the schema consists of one complex parent type which you
want to use as
- * the row tag schema, then you will need to extract the schema of the
single resulting
- * struct in the resulting StructType, and use its StructType as your schema.
- *
- * @param xsdFile XSD file
- * @return Spark-compatible schema
- */
- def read(xsdFile: Path): StructType = read(xsdFile.toFile)
-
/**
* Reads a schema from an XSD as a string.
* Note that if the schema consists of one complex parent type which you
want to use as
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/util/XSDToSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/util/XSDToSchemaSuite.scala
index 9d8b1eec8f7..434b4655d40 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/util/XSDToSchemaSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/util/XSDToSchemaSuite.scala
@@ -16,7 +16,9 @@
*/
package org.apache.spark.sql.execution.datasources.xml.util
-import java.nio.file.Paths
+import java.io.FileNotFoundException
+
+import org.apache.hadoop.fs.Path
import org.apache.spark.sql.execution.datasources.xml.TestUtils._
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
@@ -28,8 +30,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
private val resDir = "test-data/xml-resources/"
test("Basic parsing") {
- val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir +
"basket.xsd")
- .replace("file:/", "/")))
+ val parsedSchema = XSDToSchema.read(new Path(testFile(resDir +
"basket.xsd")))
val expectedSchema = buildSchema(
field("basket",
structField(
@@ -40,8 +41,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
}
test("Relative path parsing") {
- val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir +
"include-example/first.xsd")
- .replace("file:/", "/")))
+ val parsedSchema = XSDToSchema.read(new Path(testFile(resDir +
"include-example/first.xsd")))
val expectedSchema = buildSchema(
field("basket",
structField(
@@ -52,8 +52,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
}
test("Test schema types and attributes") {
- val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir +
"catalog.xsd")
- .replace("file:/", "/")))
+ val parsedSchema = XSDToSchema.read(new Path(testFile(resDir +
"catalog.xsd")))
val expectedSchema = buildSchema(
field("catalog",
structField(
@@ -76,23 +75,20 @@ class XSDToSchemaSuite extends SharedSparkSession {
}
test("Test xs:choice nullability") {
- val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir +
"choice.xsd")
- .replace("file:/", "/")))
+ val parsedSchema = XSDToSchema.read(new Path(testFile(resDir +
"choice.xsd")))
val expectedSchema = buildSchema(
field("el", structField(field("foo"), field("bar"), field("baz")),
nullable = false))
assert(expectedSchema === parsedSchema)
}
test("Two root elements") {
- val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir +
"twoelements.xsd")
- .replace("file:/", "/")))
+ val parsedSchema = XSDToSchema.read(new Path(testFile(resDir +
"twoelements.xsd")))
val expectedSchema = buildSchema(field("bar", nullable = false),
field("foo", nullable = false))
assert(expectedSchema === parsedSchema)
}
test("xs:any schema") {
- val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir +
"xsany.xsd")
- .replace("file:/", "/")))
+ val parsedSchema = XSDToSchema.read(new Path(testFile(resDir +
"xsany.xsd")))
val expectedSchema = buildSchema(
field("root",
structField(
@@ -117,8 +113,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
}
test("Tests xs:long type / Issue 520") {
- val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "long.xsd")
- .replace("file:/", "/")))
+ val parsedSchema = XSDToSchema.read(new Path(testFile(resDir +
"long.xsd")))
val expectedSchema = buildSchema(
field("test",
structField(field("userId", LongType, nullable = false)), nullable =
false))
@@ -126,8 +121,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
}
test("Test xs:decimal type with restriction[fractionalDigits]") {
- val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir +
- "decimal-with-restriction.xsd").replace("file:/", "/")))
+ val parsedSchema = XSDToSchema.read(new Path(testFile(resDir +
"decimal-with-restriction.xsd")))
val expectedSchema = buildSchema(
field("decimal_type_3", DecimalType(12, 6), nullable = false),
field("decimal_type_1", DecimalType(38, 18), nullable = false),
@@ -137,8 +131,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
}
test("Test ref attribute / Issue 617") {
- val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir +
"ref-attribute.xsd")
- .replace("file:/", "/")))
+ val parsedSchema = XSDToSchema.read(new Path(testFile(resDir +
"ref-attribute.xsd")))
val expectedSchema = buildSchema(
field(
"book",
@@ -166,8 +159,8 @@ class XSDToSchemaSuite extends SharedSparkSession {
}
test("Test complex content with extension element / Issue 554") {
- val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir +
- "complex-content-extension.xsd").replace("file:/", "/")))
+ val parsedSchema =
+ XSDToSchema.read(new Path(testFile(resDir +
"complex-content-extension.xsd")))
val expectedSchema = buildSchema(
field(
@@ -184,4 +177,10 @@ class XSDToSchemaSuite extends SharedSparkSession {
)
assert(parsedSchema === expectedSchema)
}
+
+ test("SPARK-45912: Test XSDToSchema when open not found files") {
+ intercept[FileNotFoundException] {
+ XSDToSchema.read(new Path("/path/not/found"))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]