This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new faedcd91d55 [SPARK-41970] Introduce SparkPath for typesafety
faedcd91d55 is described below
commit faedcd91d554a00fc76116a0c188752cf036f907
Author: David Lewis <[email protected]>
AuthorDate: Thu Jan 19 10:05:51 2023 +0800
[SPARK-41970] Introduce SparkPath for typesafety
### What changes were proposed in this pull request?
This PR proposes a strongly typed `SparkPath` that encapsulates a
url-encoded string. It has helper methods for creating hadoop paths, uris, and
uri-encoded strings.
The intent is to identify and fix various bugs in the way that Spark
handles these paths. To do this we introduced the SparkPath type to
`PartitionFile` (a widely used class), and then started fixing compile errors.
In doing so we fixed various bugs.
### Why are the changes needed?
Given `val str = "s3://bucket/path with space/a"` There is a difference
between `new Path(str)` and `new Path(new URI(str))`, and thus a difference
between `new URI(str)` and `new Path(str).toUri`.
Both `URI` and `Path` are symmetric in construction and `toString`, but are
not interchangeable. Spark confuses these two paths (uri-encoded vs not). This
PR attempts to use types to disambiguate them.
### Does this PR introduce _any_ user-facing change?
This PR proposes changing the public API of `PartitionedFile`, and various
other methods in the name of type safety. It needs to be clear to callers of an
API what type of path string is expected.
### How was this patch tested?
We rely on existing tests, and update the default temp path creation to
include paths with spaces.
Closes #39488 from databricks-david-lewis/SPARK_PATH.
Authored-by: David Lewis <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/avro/AvroFileFormat.scala | 5 +-
.../sql/v2/avro/AvroPartitionReaderFactory.scala | 7 +--
.../apache/spark/sql/avro/AvroRowReaderSuite.scala | 6 +--
.../org/apache/spark/sql/avro/AvroSuite.scala | 3 +-
.../apache/spark/deploy/worker/WorkerWatcher.scala | 2 +-
.../scala/org/apache/spark/paths/SparkPath.scala | 55 ++++++++++++++++++++++
.../scala/org/apache/spark/rpc/RpcAddress.scala | 2 +-
.../spark/ml/source/image/ImageFileFormat.scala | 6 +--
scalastyle-config.xml | 8 ++++
.../main/scala/org/apache/spark/sql/Dataset.scala | 7 +--
.../spark/sql/execution/DataSourceScanExec.scala | 4 +-
.../apache/spark/sql/execution/FileRelation.scala | 4 +-
.../spark/sql/execution/PartitionedFileUtil.scala | 5 +-
.../execution/datasources/CatalogFileIndex.scala | 3 +-
.../sql/execution/datasources/DataSource.scala | 3 +-
.../sql/execution/datasources/FileIndex.scala | 3 +-
.../sql/execution/datasources/FileScanRDD.scala | 21 ++++++---
.../datasources/HadoopFileLinesReader.scala | 4 +-
.../datasources/HadoopFileWholeTextReader.scala | 4 +-
.../execution/datasources/HadoopFsRelation.scala | 3 +-
.../datasources/PartitioningAwareFileIndex.scala | 5 +-
.../datasources/binaryfile/BinaryFileFormat.scala | 3 +-
.../execution/datasources/csv/CSVDataSource.scala | 3 +-
.../execution/datasources/csv/CSVFileFormat.scala | 2 +-
.../datasources/json/JsonDataSource.scala | 5 +-
.../execution/datasources/orc/OrcFileFormat.scala | 3 +-
.../datasources/parquet/ParquetFileFormat.scala | 4 +-
.../datasources/v2/FilePartitionReader.scala | 8 ++--
.../sql/execution/datasources/v2/FileScan.scala | 2 +-
.../v2/csv/CSVPartitionReaderFactory.scala | 2 +-
.../v2/orc/OrcPartitionReaderFactory.scala | 10 ++--
.../v2/parquet/ParquetPartitionReaderFactory.scala | 11 ++---
.../execution/streaming/FileStreamSinkLog.scala | 17 ++++---
.../sql/execution/streaming/FileStreamSource.scala | 35 ++++++++------
.../spark/sql/FileBasedDataSourceSuite.scala | 3 +-
.../scala/org/apache/spark/sql/SubquerySuite.scala | 2 +-
.../datasources/FileSourceStrategySuite.scala | 9 ++--
.../datasources/HadoopFileLinesReaderSuite.scala | 7 ++-
.../binaryfile/BinaryFileFormatSuite.scala | 4 +-
.../spark/sql/streaming/FileStreamSinkSuite.scala | 9 ++--
.../sql/streaming/FileStreamSourceSuite.scala | 45 +++++++++---------
.../apache/spark/sql/hive/orc/OrcFileFormat.scala | 5 +-
42 files changed, 216 insertions(+), 133 deletions(-)
diff --git
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index a13e0624f35..3e16e121081 100755
---
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.avro
import java.io._
-import java.net.URI
import scala.util.control.NonFatal
@@ -96,9 +95,9 @@ private[sql] class AvroFileFormat extends FileFormat
// Doing input file filtering is improper because we may generate empty
tasks that process no
// input files but stress the scheduler. We should probably add a more
general input file
// filtering mechanism for `FileFormat` data sources. See SPARK-16317.
- if (parsedOptions.ignoreExtension || file.filePath.endsWith(".avro")) {
+ if (parsedOptions.ignoreExtension ||
file.urlEncodedPath.endsWith(".avro")) {
val reader = {
- val in = new FsInput(new Path(new URI(file.filePath)), conf)
+ val in = new FsInput(file.toPath, conf)
try {
val datumReader = userProvidedSchema match {
case Some(userSchema) => new
GenericDatumReader[GenericRecord](userSchema)
diff --git
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
index 3ad63f113fe..cc7bd180e84 100644
---
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
+++
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
@@ -16,14 +16,11 @@
*/
package org.apache.spark.sql.v2.avro
-import java.net.URI
-
import scala.util.control.NonFatal
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.FsInput
-import org.apache.hadoop.fs.Path
import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
@@ -62,9 +59,9 @@ case class AvroPartitionReaderFactory(
val conf = broadcastedConf.value.value
val userProvidedSchema = options.schema
- if (options.ignoreExtension || partitionedFile.filePath.endsWith(".avro"))
{
+ if (options.ignoreExtension ||
partitionedFile.urlEncodedPath.endsWith(".avro")) {
val reader = {
- val in = new FsInput(new Path(new URI(partitionedFile.filePath)), conf)
+ val in = new FsInput(partitionedFile.toPath, conf)
try {
val datumReader = userProvidedSchema match {
case Some(userSchema) => new
GenericDatumReader[GenericRecord](userSchema)
diff --git
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
index 53064371b2a..15b1e5ecf88 100644
---
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
+++
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
@@ -18,13 +18,11 @@
package org.apache.spark.sql.avro
import java.io._
-import java.net.URI
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.FsInput
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.sql._
@@ -62,8 +60,8 @@ class AvroRowReaderSuite
case BatchScanExec(_, f: AvroScan, _, _, _, _, _) => f
}
val filePath = fileScan.get.fileIndex.inputFiles(0)
- val fileSize = new File(new URI(filePath)).length
- val in = new FsInput(new Path(new URI(filePath)), new Configuration())
+ val fileSize = new File(filePath.toUri).length
+ val in = new FsInput(filePath.toPath, new Configuration())
val reader = DataFileReader.openReader(in, new
GenericDatumReader[GenericRecord]())
val it = new Iterator[InternalRow] with AvroUtils.RowReader {
diff --git
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index f74274b0a3c..d4e85addf95 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -2357,7 +2357,8 @@ class AvroV2Suite extends AvroSuite with
ExplainSuiteHelper {
assert(fileScan.get.dataFilters.nonEmpty)
assert(fileScan.get.planInputPartitions().forall { partition =>
partition.asInstanceOf[FilePartition].files.forall { file =>
- file.filePath.contains("p1=1") && file.filePath.contains("p2=2")
+ file.urlEncodedPath.contains("p1=1") &&
+ file.urlEncodedPath.contains("p2=2")
}
})
checkAnswer(df, Row("b", 1, 2))
diff --git
a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index b7a5728dd00..deb5bb1a697 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -47,7 +47,7 @@ private[spark] class WorkerWatcher(
private[deploy] var isShutDown = false
// Lets filter events only from the worker's rpc system
- private val expectedAddress = RpcAddress.fromURIString(workerUrl)
+ private val expectedAddress = RpcAddress.fromUrlString(workerUrl)
private def isWorker(address: RpcAddress) = expectedAddress == address
private def exitNonZero() =
diff --git a/core/src/main/scala/org/apache/spark/paths/SparkPath.scala
b/core/src/main/scala/org/apache/spark/paths/SparkPath.scala
new file mode 100644
index 00000000000..5bc6233f6cf
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/paths/SparkPath.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.paths
+
+import java.net.URI
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+/**
+ * A canonical representation of a file path. This class is intended to provide
+ * type-safety to the way that Spark handles Paths. Paths can be represented as
+ * Strings in multiple ways, which are not always compatible. Spark regularly
uses
+ * two ways: 1. hadoop Path.toString and java URI.toString.
+ */
+case class SparkPath private (private val underlying: String) {
+ def urlEncoded: String = underlying
+ def toUri: URI = new URI(underlying)
+ def toPath: Path = new Path(toUri)
+ override def toString: String = underlying
+}
+
+object SparkPath {
+ /**
+ * Creates a SparkPath from a hadoop Path string.
+ * Please be very sure that the provided string is encoded (or not encoded)
in the right way.
+ *
+ * Please see the hadoop Path documentation here:
+ *
https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/Path.html#Path-java.lang.String-
+ */
+ def fromPathString(str: String): SparkPath = fromPath(new Path(str))
+ def fromPath(path: Path): SparkPath = fromUri(path.toUri)
+ def fromFileStatus(fs: FileStatus): SparkPath = fromPath(fs.getPath)
+
+ /**
+ * Creates a SparkPath from a url-encoded string.
+ * Note: It is the responsibility of the caller to ensure that str is a
valid url-encoded string.
+ */
+ def fromUrlString(str: String): SparkPath = SparkPath(str)
+ def fromUri(uri: URI): SparkPath = fromUrlString(uri.toString)
+}
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala
b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala
index 9b0739c9447..675dc24206a 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala
@@ -39,7 +39,7 @@ private[spark] case class RpcAddress(_host: String, port:
Int) {
private[spark] object RpcAddress {
/** Return the [[RpcAddress]] represented by `uri`. */
- def fromURIString(uri: String): RpcAddress = {
+ def fromUrlString(uri: String): RpcAddress = {
val uriObj = new java.net.URI(uri)
RpcAddress(uriObj.getHost, uriObj.getPort)
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
b/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
index 0995df51c64..206ce6f0675 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ml.source.image
import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.ml.image.ImageSchema
@@ -71,8 +71,8 @@ private[image] class ImageFileFormat extends FileFormat with
DataSourceRegister
if (!imageSourceOptions.dropInvalid && requiredSchema.isEmpty) {
Iterator(emptyUnsafeRow)
} else {
- val origin = file.filePath
- val path = new Path(origin)
+ val origin = file.urlEncodedPath
+ val path = file.toPath
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val stream = fs.open(path)
val bytes = try {
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index f34b5d55e42..3dcb03b13fd 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -437,4 +437,12 @@ This file is divided into 3 sections:
Use org.apache.spark.util.Utils.createTempDir instead.
</customMessage>
</check>
+
+ <check customId="pathfromuri" level="error"
class="org.scalastyle.file.RegexChecker" enabled="true">
+ <parameters><parameter name="regex">new Path\(new
URI\(</parameter></parameters>
+ <customMessage><![CDATA[
+ Are you sure that this string is uri encoded? Please be careful when
converting hadoop Paths
+ and URIs to and from String. If possible, please use SparkPath.
+ ]]></customMessage>
+ </check>
</scalastyle>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index c8e2a48859d..88c4fe511a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -34,6 +34,7 @@ import org.apache.spark.api.java.function._
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.api.r.RRDD
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.paths.SparkPath
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow,
QueryPlanningTracker, ScalaReflection, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
@@ -3924,18 +3925,18 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def inputFiles: Array[String] = {
- val files: Seq[String] = queryExecution.optimizedPlan.collect {
+ val files: Seq[SparkPath] = queryExecution.optimizedPlan.collect {
case LogicalRelation(fsBasedRelation: FileRelation, _, _, _) =>
fsBasedRelation.inputFiles
case fr: FileRelation =>
fr.inputFiles
case r: HiveTableRelation =>
- r.tableMeta.storage.locationUri.map(_.toString).toArray
+ r.tableMeta.storage.locationUri.map(SparkPath.fromUri).toArray
case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _,
_, _, _),
_, _, _, _) =>
table.fileIndex.inputFiles
}.flatten
- files.toSet.toArray
+ files.iterator.map(_.urlEncoded).toSet.toArray
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 8dda88e86c0..0f4b8c563d2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -632,8 +632,8 @@ case class FileSourceScanExec(
}
}.groupBy { f =>
BucketingUtils
- .getBucketId(new Path(f.filePath).getName)
- .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath))
+ .getBucketId(f.toPath.getName)
+ .getOrElse(throw
QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath))
}
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
index a299fed7fd1..6a832b784fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution
+import org.apache.spark.paths.SparkPath
+
/**
* An interface for relations that are backed by files. When a class
implements this interface,
* the list of paths that it returns will be returned to a user who calls
`inputPaths` on any
@@ -24,5 +26,5 @@ package org.apache.spark.sql.execution
*/
trait FileRelation {
/** Returns the list of files that will be read when scanning this relation.
*/
- def inputFiles: Array[String]
+ def inputFiles: Array[SparkPath]
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
index 4cccd4132e9..fd5f2f25c0b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus,
Path}
+import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources._
@@ -36,7 +37,7 @@ object PartitionedFileUtil {
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(getBlockLocations(file), offset, size)
- PartitionedFile(partitionValues, filePath.toUri.toString, offset,
size, hosts,
+ PartitionedFile(partitionValues, SparkPath.fromPath(filePath), offset,
size, hosts,
file.getModificationTime, file.getLen)
}
} else {
@@ -49,7 +50,7 @@ object PartitionedFileUtil {
filePath: Path,
partitionValues: InternalRow): PartitionedFile = {
val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen)
- PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen,
hosts,
+ PartitionedFile(partitionValues, SparkPath.fromPath(filePath), 0,
file.getLen, hosts,
file.getModificationTime, file.getLen)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 727b33018fb..f12b72f6867 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -22,6 +22,7 @@ import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.{CatalogTable,
ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.expressions._
@@ -94,7 +95,7 @@ class CatalogFileIndex(
}
}
- override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
+ override def inputFiles: Array[SparkPath] = filterPartitions(Nil).inputFiles
// `CatalogFileIndex` may be a member of `HadoopFsRelation`,
`HadoopFsRelation` may be a member
// of `LogicalRelation`, and `LogicalRelation` may be used as the cache key.
So we need to
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index ad26ee21c2c..94dd3bc0bd6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -70,7 +70,8 @@ import org.apache.spark.util.{HadoopFSUtils, ThreadUtils,
Utils}
*
* @param paths A list of file system paths that hold data. These will be
globbed before if
* the "__globPaths__" option is true, and will be qualified.
This option only works
- * when reading from a [[FileFormat]].
+ * when reading from a [[FileFormat]]. These paths are expected
to be hadoop [[Path]]
+ * strings.
* @param userSpecifiedSchema An optional specification of the schema of the
data. When present
* we skip attempting to infer the schema.
* @param partitionColumns A list of column names that the relation is
partitioned by. This list is
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
index 7bfc781797e..d9a63edca73 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.hadoop.fs._
+import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.StructType
@@ -62,7 +63,7 @@ trait FileIndex {
* Returns the list of files that will be read when scanning this relation.
This call may be
* very expensive for large tables.
*/
- def inputFiles: Array[String]
+ def inputFiles: Array[SparkPath]
/** Refresh any cached file listings */
def refresh(): Unit
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 827d41dd096..0ccf72823f1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources
import java.io.{Closeable, FileNotFoundException, IOException}
+import java.net.URI
import scala.util.control.NonFatal
@@ -25,6 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException,
TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.paths.SparkPath
import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
@@ -51,12 +53,17 @@ import org.apache.spark.util.NextIterator
*/
case class PartitionedFile(
partitionValues: InternalRow,
- filePath: String,
+ filePath: SparkPath,
start: Long,
length: Long,
@transient locations: Array[String] = Array.empty,
modificationTime: Long = 0L,
fileSize: Long = 0L) {
+
+ def pathUri: URI = filePath.toUri
+ def toPath: Path = filePath.toPath
+ def urlEncodedPath: String = filePath.urlEncoded
+
override def toString: String = {
s"path: $filePath, range: $start-${start + length}, partition values:
$partitionValues"
}
@@ -140,14 +147,14 @@ class FileScanRDD(
private def updateMetadataRow(): Unit =
if (metadataColumns.nonEmpty && currentFile != null) {
updateMetadataInternalRow(metadataRow, metadataColumns.map(_.name),
- new Path(currentFile.filePath), currentFile.fileSize,
currentFile.modificationTime)
+ currentFile.toPath, currentFile.fileSize,
currentFile.modificationTime)
}
/**
* Create an array of constant column vectors containing all required
metadata columns
*/
private def createMetadataColumnVector(c: ColumnarBatch):
Array[ColumnVector] = {
- val path = new Path(currentFile.filePath)
+ val path = currentFile.toPath
metadataColumns.map(_.name).map {
case FILE_PATH =>
val columnVector = new ConstantColumnVector(c.numRows(),
StringType)
@@ -223,7 +230,8 @@ class FileScanRDD(
updateMetadataRow()
logInfo(s"Reading File $currentFile")
// Sets InputFileBlockHolder for the file block's information
- InputFileBlockHolder.set(currentFile.filePath, currentFile.start,
currentFile.length)
+ InputFileBlockHolder
+ .set(currentFile.urlEncodedPath, currentFile.start,
currentFile.length)
resetCurrentIterator()
if (ignoreMissingFiles || ignoreCorruptFiles) {
@@ -278,12 +286,13 @@ class FileScanRDD(
} catch {
case e: SchemaColumnConvertNotSupportedException =>
throw QueryExecutionErrors.unsupportedSchemaColumnConvertError(
- currentFile.filePath, e.getColumn, e.getLogicalType,
e.getPhysicalType, e)
+ currentFile.urlEncodedPath, e.getColumn, e.getLogicalType,
e.getPhysicalType, e)
case sue: SparkUpgradeException => throw sue
case NonFatal(e) =>
e.getCause match {
case sue: SparkUpgradeException => throw sue
- case _ => throw QueryExecutionErrors.cannotReadFilesError(e,
currentFile.filePath)
+ case _ =>
+ throw QueryExecutionErrors.cannotReadFilesError(e,
currentFile.urlEncodedPath)
}
}
} else {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
index b5e276bd421..5ec17290c37 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
@@ -18,10 +18,8 @@
package org.apache.spark.sql.execution.datasources
import java.io.Closeable
-import java.net.URI
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, LineRecordReader}
@@ -48,7 +46,7 @@ class HadoopFileLinesReader(
private val _iterator = {
val fileSplit = new FileSplit(
- new Path(new URI(file.filePath)),
+ file.toPath,
file.start,
file.length,
// The locality is decided by `getPreferredLocations` in `FileScanRDD`.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
index a48001f04a9..17649f62d84 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
@@ -18,10 +18,8 @@
package org.apache.spark.sql.execution.datasources
import java.io.Closeable
-import java.net.URI
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
@@ -37,7 +35,7 @@ class HadoopFileWholeTextReader(file: PartitionedFile, conf:
Configuration)
extends Iterator[Text] with Closeable {
private val _iterator = {
val fileSplit = new CombineFileSplit(
- Array(new Path(new URI(file.filePath))),
+ Array(file.toPath),
Array(file.start),
Array(file.length),
// The locality is decided by `getPreferredLocations` in `FileScanRDD`.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index fd1824055dc..bd04ddb2ec6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources
+import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.FileRelation
@@ -70,5 +71,5 @@ case class HadoopFsRelation(
}
- override def inputFiles: Array[String] = location.inputFiles
+ override def inputFiles: Array[SparkPath] = location.inputFiles
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 137fd6fe1ac..2d8c7b19507 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.internal.Logging
+import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
@@ -136,8 +137,8 @@ abstract class PartitioningAwareFileIndex(
}
/** Returns the list of files that will be read when scanning this relation.
*/
- override def inputFiles: Array[String] =
- allFiles().map(_.getPath.toUri.toString).toArray
+ override def inputFiles: Array[SparkPath] =
+ allFiles().map(SparkPath.fromFileStatus).toArray
override def sizeInBytes: Long = allFiles().map(_.getLen).sum
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
index 43512ff5ac8..ba6d351761e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.binaryfile
-import java.net.URI
import java.sql.Timestamp
import com.google.common.io.{ByteStreams, Closeables}
@@ -101,7 +100,7 @@ class BinaryFileFormat extends FileFormat with
DataSourceRegister {
val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH)
file: PartitionedFile => {
- val path = new Path(new URI(file.filePath))
+ val path = file.toPath
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)
if (filterFuncs.forall(_.apply(status))) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index d8fa768a604..99d43953c4c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.csv
-import java.net.URI
import java.nio.charset.{Charset, StandardCharsets}
import com.univocity.parsers.csv.CsvParser
@@ -179,7 +178,7 @@ object MultiLineCSVDataSource extends CSVDataSource {
headerChecker: CSVHeaderChecker,
requiredSchema: StructType): Iterator[InternalRow] = {
UnivocityParser.parseStream(
- CodecStreams.createInputStreamWithCloseResource(conf, new Path(new
URI(file.filePath))),
+ CodecStreams.createInputStreamWithCloseResource(conf, file.toPath),
parser,
headerChecker,
requiredSchema)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 93679516a8c..2a6c209ff0c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -128,7 +128,7 @@ class CSVFileFormat extends TextBasedFileFormat with
DataSourceRegister {
val schema = if (columnPruning) actualRequiredSchema else
actualDataSchema
val isStartOfFile = file.start == 0
val headerChecker = new CSVHeaderChecker(
- schema, parsedOptions, source = s"CSV file: ${file.filePath}",
isStartOfFile)
+ schema, parsedOptions, source = s"CSV file: ${file.urlEncodedPath}",
isStartOfFile)
CSVDataSource(parsedOptions).readFile(
conf,
file,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 2f4cd468457..7c98c31bba2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources.json
import java.io.InputStream
-import java.net.URI
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import com.google.common.io.ByteStreams
@@ -211,7 +210,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
schema: StructType): Iterator[InternalRow] = {
def partitionedFileString(ignored: Any): UTF8String = {
Utils.tryWithResource {
- CodecStreams.createInputStreamWithCloseResource(conf, new Path(new
URI(file.filePath)))
+ CodecStreams.createInputStreamWithCloseResource(conf, file.toPath)
} { inputStream =>
UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
}
@@ -227,6 +226,6 @@ object MultiLineJsonDataSource extends JsonDataSource {
parser.options.columnNameOfCorruptRecord)
safeParser.parse(
- CodecStreams.createInputStreamWithCloseResource(conf, new Path(new
URI(file.filePath))))
+ CodecStreams.createInputStreamWithCloseResource(conf, file.toPath))
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 6a58513c346..cb18566e848 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources.orc
import java.io._
-import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -164,7 +163,7 @@ class OrcFileFormat
(file: PartitionedFile) => {
val conf = broadcastedConf.value.value
- val filePath = new Path(new URI(file.filePath))
+ val filePath = file.toPath
val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 80b6791d8fa..6b4651e3260 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.datasources.parquet
-import java.net.URI
-
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Try}
@@ -200,7 +198,7 @@ class ParquetFileFormat
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
- val filePath = new Path(new URI(file.filePath))
+ val filePath = file.toPath
val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
val sharedConf = broadcastedHadoopConf.value.value
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
index 782c1f50d80..7159bc6de3a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
@@ -66,7 +66,8 @@ class FilePartitionReader[T](
} catch {
case e: SchemaColumnConvertNotSupportedException =>
throw QueryExecutionErrors.unsupportedSchemaColumnConvertError(
- currentReader.file.filePath, e.getColumn, e.getLogicalType,
e.getPhysicalType, e)
+ currentReader.file.urlEncodedPath,
+ e.getColumn, e.getLogicalType, e.getPhysicalType, e)
case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest of the content in the corrupted file:
$currentReader", e)
@@ -75,7 +76,8 @@ class FilePartitionReader[T](
case NonFatal(e) =>
e.getCause match {
case sue: SparkUpgradeException => throw sue
- case _ => throw QueryExecutionErrors.cannotReadFilesError(e,
currentReader.file.filePath)
+ case _ => throw QueryExecutionErrors.cannotReadFilesError(e,
+ currentReader.file.urlEncodedPath)
}
}
if (hasNext) {
@@ -101,7 +103,7 @@ class FilePartitionReader[T](
logInfo(s"Reading file $reader")
// Sets InputFileBlockHolder for the file block's information
val file = reader.file
- InputFileBlockHolder.set(file.filePath, file.start, file.length)
+ InputFileBlockHolder.set(file.urlEncodedPath, file.start, file.length)
reader
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
index 9b6f9932866..0cfb55ab407 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
@@ -163,7 +163,7 @@ trait FileScan extends Scan
}
if (splitFiles.length == 1) {
- val path = new Path(splitFiles(0).filePath)
+ val path = splitFiles(0).toPath
if (!isSplitable(path) && splitFiles(0).length >
sparkSession.sparkContext.getConf.get(IO_WARNING_LARGEFILETHRESHOLD)) {
logWarning(s"Loading one large unsplittable file ${path.toString} with
only one " +
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
index f8a17c8eaa8..37f6ae4aaa9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
@@ -61,7 +61,7 @@ case class CSVPartitionReaderFactory(
val schema = if (options.columnPruning) actualReadDataSchema else
actualDataSchema
val isStartOfFile = file.start == 0
val headerChecker = new CSVHeaderChecker(
- schema, options, source = s"CSV file: ${file.filePath}", isStartOfFile)
+ schema, options, source = s"CSV file: ${file.urlEncodedPath}",
isStartOfFile)
val iter = CSVDataSource(options).readFile(
conf,
file,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
index 4f93a67cc46..2b7bdae6b31 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -16,8 +16,6 @@
*/
package org.apache.spark.sql.execution.datasources.v2.orc
-import java.net.URI
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
@@ -86,7 +84,7 @@ case class OrcPartitionReaderFactory(
if (aggregation.nonEmpty) {
return buildReaderWithAggregates(file, conf)
}
- val filePath = new Path(new URI(file.filePath))
+ val filePath = file.toPath
val orcSchema = Utils.tryWithResource(createORCReader(filePath,
conf))(_.getSchema)
val resultedColPruneInfo = OrcUtils.requestedColumnIds(
@@ -127,7 +125,7 @@ case class OrcPartitionReaderFactory(
if (aggregation.nonEmpty) {
return buildColumnarReaderWithAggregates(file, conf)
}
- val filePath = new Path(new URI(file.filePath))
+ val filePath = file.toPath
val orcSchema = Utils.tryWithResource(createORCReader(filePath,
conf))(_.getSchema)
val resultedColPruneInfo = OrcUtils.requestedColumnIds(
@@ -181,7 +179,7 @@ case class OrcPartitionReaderFactory(
private def buildReaderWithAggregates(
file: PartitionedFile,
conf: Configuration): PartitionReader[InternalRow] = {
- val filePath = new Path(new URI(file.filePath))
+ val filePath = file.toPath
new PartitionReader[InternalRow] {
private var hasNext = true
private lazy val row: InternalRow = {
@@ -209,7 +207,7 @@ case class OrcPartitionReaderFactory(
private def buildColumnarReaderWithAggregates(
file: PartitionedFile,
conf: Configuration): PartitionReader[ColumnarBatch] = {
- val filePath = new Path(new URI(file.filePath))
+ val filePath = file.toPath
new PartitionReader[ColumnarBatch] {
private var hasNext = true
private lazy val batch: ColumnarBatch = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 121ebe1cfa2..5951c1d8dd9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -16,10 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources.v2.parquet
-import java.net.URI
import java.time.ZoneId
-import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
@@ -89,7 +87,7 @@ case class ParquetPartitionReaderFactory(
private def getFooter(file: PartitionedFile): ParquetMetadata = {
val conf = broadcastedConf.value.value
- val filePath = new Path(new URI(file.filePath))
+ val filePath = file.toPath
if (aggregation.isEmpty) {
ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS)
@@ -132,7 +130,8 @@ case class ParquetPartitionReaderFactory(
val footer = getFooter(file)
if (footer != null && footer.getBlocks.size > 0) {
- ParquetUtils.createAggInternalRowFromFooter(footer, file.filePath,
dataSchema,
+ ParquetUtils.createAggInternalRowFromFooter(footer,
file.urlEncodedPath,
+ dataSchema,
partitionSchema, aggregation.get, readDataSchema,
file.partitionValues,
getDatetimeRebaseSpec(footer.getFileMetaData))
} else {
@@ -175,7 +174,7 @@ case class ParquetPartitionReaderFactory(
private val batch: ColumnarBatch = {
val footer = getFooter(file)
if (footer != null && footer.getBlocks.size > 0) {
- val row = ParquetUtils.createAggInternalRowFromFooter(footer,
file.filePath,
+ val row = ParquetUtils.createAggInternalRowFromFooter(footer,
file.urlEncodedPath,
dataSchema, partitionSchema, aggregation.get, readDataSchema,
file.partitionValues,
getDatetimeRebaseSpec(footer.getFileMetaData))
AggregatePushDownUtils.convertAggregatesRowToBatch(
@@ -209,7 +208,7 @@ case class ParquetPartitionReaderFactory(
RebaseSpec) => RecordReader[Void, T]): RecordReader[Void, T] = {
val conf = broadcastedConf.value.value
- val filePath = new Path(new URI(file.filePath))
+ val filePath = file.toPath
val split = new FileSplit(filePath, file.start, file.length,
Array.empty[String])
lazy val footerFileMetaData = getFooter(file).getFileMetaData
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 2d70d95c685..94ba8b8aa51 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -17,12 +17,11 @@
package org.apache.spark.sql.execution.streaming
-import java.net.URI
-
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.FileStatus
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
+import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.SQLConf
@@ -30,7 +29,7 @@ import org.apache.spark.sql.internal.SQLConf
* The status of a file outputted by [[FileStreamSink]]. A file is visible
only if it appears in
* the sink log and its action is not "delete".
*
- * @param path the file path.
+ * @param path the file path as a uri-encoded string.
* @param size the file size.
* @param isDir whether this file is a directory.
* @param modificationTime the file last modification time.
@@ -46,17 +45,23 @@ case class SinkFileStatus(
blockReplication: Int,
blockSize: Long,
action: String) {
+ def sparkPath: SparkPath = SparkPath.fromPathString(path)
def toFileStatus: FileStatus = {
new FileStatus(
- size, isDir, blockReplication, blockSize, modificationTime, new Path(new
URI(path)))
+ size,
+ isDir,
+ blockReplication,
+ blockSize,
+ modificationTime,
+ SparkPath.fromUrlString(path).toPath)
}
}
object SinkFileStatus {
def apply(f: FileStatus): SinkFileStatus = {
SinkFileStatus(
- path = f.getPath.toUri.toString,
+ path = SparkPath.fromPath(f.getPath).urlEncoded,
size = f.getLen,
isDir = f.isDirectory,
modificationTime = f.getModificationTime,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 5baf3d29a49..6eb2ffef44e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.streaming
-import java.net.URI
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit._
@@ -28,6 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem,
GlobFilter, Path}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming
@@ -109,16 +109,16 @@ class FileStreamSource(
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
- private var allFilesForTriggerAvailableNow: Seq[(String, Long)] = _
+ private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
metadataLog.restore().foreach { entry =>
- seenFiles.add(entry.path, entry.timestamp)
+ seenFiles.add(entry.sparkPath, entry.timestamp)
}
seenFiles.purge()
logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs =
$maxFileAgeMs")
- private var unreadFiles: Seq[(String, Long)] = _
+ private var unreadFiles: Seq[(SparkPath, Long)] = _
/**
* Returns the maximum offset that can be retrieved from the source.
@@ -193,7 +193,7 @@ class FileStreamSource(
metadataLogCurrentOffset += 1
val fileEntries = batchFiles.map { case (p, timestamp) =>
- FileEntry(path = p, timestamp = timestamp, batchId =
metadataLogCurrentOffset)
+ FileEntry(path = p.urlEncoded, timestamp = timestamp, batchId =
metadataLogCurrentOffset)
}.toArray
if (metadataLog.add(metadataLogCurrentOffset, fileEntries)) {
logInfo(s"Log offset set to $metadataLogCurrentOffset with
${batchFiles.size} new files")
@@ -239,7 +239,7 @@ class FileStreamSource(
val newDataSource =
DataSource(
sparkSession,
- paths = files.map(f => new Path(new URI(f.path)).toString),
+ paths = files.map(_.sparkPath.toPath.toString),
userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumns,
className = fileFormatClassName,
@@ -286,7 +286,7 @@ class FileStreamSource(
/**
* Returns a list of files found, sorted by their timestamp.
*/
- private def fetchAllFiles(): Seq[(String, Long)] = {
+ private def fetchAllFiles(): Seq[(SparkPath, Long)] = {
val startTime = System.nanoTime
var allFiles: Seq[FileStatus] = null
@@ -318,7 +318,7 @@ class FileStreamSource(
}
val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map {
status =>
- (status.getPath.toUri.toString, status.getModificationTime)
+ (SparkPath.fromFileStatus(status), status.getModificationTime)
}
val endTime = System.nanoTime
val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime)
@@ -368,7 +368,12 @@ object FileStreamSource {
val DISCARD_UNSEEN_FILES_RATIO = 0.2
val MAX_CACHED_UNSEEN_FILES = 10000
- case class FileEntry(path: String, timestamp: Timestamp, batchId: Long)
extends Serializable
+ case class FileEntry(
+ path: String, // uri-encoded path string
+ timestamp: Timestamp,
+ batchId: Long) extends Serializable {
+ def sparkPath: SparkPath = SparkPath.fromUrlString(path)
+ }
/**
* A custom hash map used to track the list of files seen. This map is not
thread-safe.
@@ -388,12 +393,12 @@ object FileStreamSource {
/** Timestamp for the last purge operation. */
private var lastPurgeTimestamp: Timestamp = 0L
- @inline private def stripPathIfNecessary(path: String) = {
- if (fileNameOnly) new Path(new URI(path)).getName else path
+ @inline private def stripPathIfNecessary(path: SparkPath) = {
+ if (fileNameOnly) path.toPath.getName else path.urlEncoded
}
/** Add a new file to the map. */
- def add(path: String, timestamp: Timestamp): Unit = {
+ def add(path: SparkPath, timestamp: Timestamp): Unit = {
map.put(stripPathIfNecessary(path), timestamp)
if (timestamp > latestTimestamp) {
latestTimestamp = timestamp
@@ -404,7 +409,7 @@ object FileStreamSource {
* Returns true if we should consider this file a new file. The file is
only considered "new"
* if it is new enough that we are still tracking, and we have not seen it
before.
*/
- def isNewFile(path: String, timestamp: Timestamp): Boolean = {
+ def isNewFile(path: SparkPath, timestamp: Timestamp): Boolean = {
// Note that we are testing against lastPurgeTimestamp here so we'd
never miss a file that
// is older than (latestTimestamp - maxAgeMs) but has not been purged
yet.
timestamp >= lastPurgeTimestamp &&
!map.containsKey(stripPathIfNecessary(path))
@@ -551,7 +556,7 @@ object FileStreamSource {
}
override protected def cleanTask(entry: FileEntry): Unit = {
- val curPath = new Path(new URI(entry.path))
+ val curPath = entry.sparkPath.toPath
val newPath = new Path(baseArchivePath.toString.stripSuffix("/") +
curPath.toUri.getPath)
try {
@@ -575,7 +580,7 @@ object FileStreamSource {
extends FileStreamSourceCleaner with Logging {
override protected def cleanTask(entry: FileEntry): Unit = {
- val curPath = new Path(new URI(entry.path))
+ val curPath = entry.sparkPath.toPath
try {
logDebug(s"Removing completed file $curPath")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 3b81d215c7f..474de0dacae 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -881,7 +881,8 @@ class FileBasedDataSourceSuite extends QueryTest
assert(fileScan.get.dataFilters.nonEmpty)
assert(fileScan.get.planInputPartitions().forall { partition =>
partition.asInstanceOf[FilePartition].files.forall { file =>
- file.filePath.contains("p1=1") && file.filePath.contains("p2=2")
+ file.urlEncodedPath.contains("p1=1") &&
+ file.urlEncodedPath.contains("p2=2")
}
})
checkAnswer(df, Row("b", 1, 2))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 86a0c4d1799..2d7cd007bee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -1461,7 +1461,7 @@ class SubquerySuite extends QueryTest
partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
fs.inputRDDs().forall(
_.asInstanceOf[FileScanRDD].filePartitions.forall(
- _.files.forall(_.filePath.contains("p=0"))))
+ _.files.forall(_.urlEncodedPath.contains("p=0"))))
case _ => false
})
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 4dbe619610e..26655c2d95a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path,
RawLocalFileSystem
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkException
+import org.apache.spark.paths.SparkPath.{fromUrlString => sp}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -283,10 +284,10 @@ class FileSourceStrategySuite extends QueryTest with
SharedSparkSession {
test("Locality support for FileScanRDD") {
val partition = FilePartition(0, Array(
- PartitionedFile(InternalRow.empty, "fakePath0", 0, 10, Array("host0",
"host1")),
- PartitionedFile(InternalRow.empty, "fakePath0", 10, 20, Array("host1",
"host2")),
- PartitionedFile(InternalRow.empty, "fakePath1", 0, 5, Array("host3")),
- PartitionedFile(InternalRow.empty, "fakePath2", 0, 5, Array("host4"))
+ PartitionedFile(InternalRow.empty, sp("fakePath0"), 0, 10,
Array("host0", "host1")),
+ PartitionedFile(InternalRow.empty, sp("fakePath0"), 10, 20,
Array("host1", "host2")),
+ PartitionedFile(InternalRow.empty, sp("fakePath1"), 0, 5,
Array("host3")),
+ PartitionedFile(InternalRow.empty, sp("fakePath2"), 0, 5, Array("host4"))
))
val fakeRDD = new FileScanRDD(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
index 771ddbd6523..b6b89ab3043 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
@@ -23,6 +23,7 @@ import java.nio.file.Files
import org.apache.hadoop.conf.Configuration
+import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.test.SharedSparkSession
@@ -37,7 +38,11 @@ class HadoopFileLinesReaderSuite extends SharedSparkSession {
Files.write(path.toPath, text.getBytes(StandardCharsets.UTF_8))
val lines = ranges.flatMap { case (start, length) =>
- val file = PartitionedFile(InternalRow.empty, path.getCanonicalPath,
start, length)
+ val file = PartitionedFile(
+ InternalRow.empty,
+ SparkPath.fromPathString(path.getCanonicalPath),
+ start,
+ length)
val hadoopConf = conf.getOrElse(spark.sessionState.newHadoopConf())
val reader = new HadoopFileLinesReader(file, delimOpt, hadoopConf)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index 9a374d5c302..1d2e467c94c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -278,7 +278,7 @@ class BinaryFileFormatSuite extends QueryTest with
SharedSparkSession {
options = Map.empty,
hadoopConf = spark.sessionState.newHadoopConf())
val partitionedFile = mock(classOf[PartitionedFile])
- when(partitionedFile.filePath).thenReturn(fileStatus.getPath.toString)
+ when(partitionedFile.toPath).thenReturn(fileStatus.getPath)
assert(reader(partitionedFile).nonEmpty === expected,
s"Filters $filters applied to $fileStatus should be $expected.")
}
@@ -305,7 +305,7 @@ class BinaryFileFormatSuite extends QueryTest with
SharedSparkSession {
hadoopConf = spark.sessionState.newHadoopConf()
)
val partitionedFile = mock(classOf[PartitionedFile])
- when(partitionedFile.filePath).thenReturn(file.getPath)
+ when(partitionedFile.toPath).thenReturn(new Path(file.toURI))
val encoder = RowEncoder(requiredSchema).resolveAndBind()
encoder.createDeserializer().apply(reader(partitionedFile).next())
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 568b1df4c40..8c31d3c7abf 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.JobContext
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.paths.SparkPath
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.catalyst.util.stringToFile
@@ -519,7 +520,7 @@ abstract class FileStreamSinkSuite extends StreamTest {
.filter(_.toString.endsWith(".parquet"))
.map(_.getFileName.toString)
.toSet
- val trackingFileNames = tracking.map(new Path(_).getName).toSet
+ val trackingFileNames =
tracking.map(SparkPath.fromUrlString(_).toPath.getName).toSet
// there would be possible to have race condition:
// - some tasks complete while abortJob is being called
@@ -569,7 +570,7 @@ abstract class FileStreamSinkSuite extends StreamTest {
val allFiles = sinkLog.allFiles()
// only files from non-empty partition should be logged
assert(allFiles.length < 10)
- assert(allFiles.forall(file => fs.exists(new Path(file.path))))
+ assert(allFiles.forall(file => fs.exists(file.sparkPath.toPath)))
// the query should be able to read all rows correctly with metadata
log
val outputDf =
spark.read.format(format).load(outputDir.getCanonicalPath)
@@ -709,14 +710,14 @@ class FileStreamSinkV1Suite extends FileStreamSinkSuite {
// Read with pruning, should read only files in partition dir id=1
checkFileScanPartitions(df.filter("id = 1")) { partitions =>
val filesToBeRead = partitions.flatMap(_.files)
- assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/")))
+ assert(filesToBeRead.map(_.urlEncodedPath).forall(_.contains("/id=1/")))
assert(filesToBeRead.map(_.partitionValues).distinct.size === 1)
}
// Read with pruning, should read only files in partition dir id=1 and id=2
checkFileScanPartitions(df.filter("id in (1,2)")) { partitions =>
val filesToBeRead = partitions.flatMap(_.files)
- assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/")))
+ assert(!filesToBeRead.map(_.urlEncodedPath).exists(_.contains("/id=3/")))
assert(filesToBeRead.map(_.partitionValues).distinct.size === 2)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 9b1e5a9e16e..a8a4df2ad04 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -34,6 +34,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.paths.SparkPath.{fromUrlString => sp}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.read.streaming.ReadLimit
@@ -1761,69 +1762,69 @@ class FileStreamSourceSuite extends
FileStreamSourceTest {
test("SeenFilesMap") {
val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false)
- map.add("a", 5)
+ map.add(sp("a"), 5)
assert(map.size == 1)
map.purge()
assert(map.size == 1)
// Add a new entry and purge should be no-op, since the gap is exactly 10
ms.
- map.add("b", 15)
+ map.add(sp("b"), 15)
assert(map.size == 2)
map.purge()
assert(map.size == 2)
// Add a new entry that's more than 10 ms than the first entry. We should
be able to purge now.
- map.add("c", 16)
+ map.add(sp("c"), 16)
assert(map.size == 3)
map.purge()
assert(map.size == 2)
// Override existing entry shouldn't change the size
- map.add("c", 25)
+ map.add(sp("c"), 25)
assert(map.size == 2)
// Not a new file because we have seen c before
- assert(!map.isNewFile("c", 20))
+ assert(!map.isNewFile(sp("c"), 20))
// Not a new file because timestamp is too old
- assert(!map.isNewFile("d", 5))
+ assert(!map.isNewFile(sp("d"), 5))
// Finally a new file: never seen and not too old
- assert(map.isNewFile("e", 20))
+ assert(map.isNewFile(sp("e"), 20))
}
test("SeenFilesMap with fileNameOnly = true") {
val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = true)
- map.add("file:///a/b/c/d", 5)
- map.add("file:///a/b/c/e", 5)
+ map.add(sp("file:///a/b/c/d"), 5)
+ map.add(sp("file:///a/b/c/e"), 5)
assert(map.size === 2)
- assert(!map.isNewFile("d", 5))
- assert(!map.isNewFile("file:///d", 5))
- assert(!map.isNewFile("file:///x/d", 5))
- assert(!map.isNewFile("file:///x/y/d", 5))
+ assert(!map.isNewFile(sp("d"), 5))
+ assert(!map.isNewFile(sp("file:///d"), 5))
+ assert(!map.isNewFile(sp("file:///x/d"), 5))
+ assert(!map.isNewFile(sp("file:///x/y/d"), 5))
- map.add("s3:///bucket/d", 5)
- map.add("s3n:///bucket/d", 5)
- map.add("s3a:///bucket/d", 5)
+ map.add(sp("s3:///bucket/d"), 5)
+ map.add(sp("s3n:///bucket/d"), 5)
+ map.add(sp("s3a:///bucket/d"), 5)
assert(map.size === 2)
}
test("SeenFilesMap should only consider a file old if it is earlier than
last purge time") {
val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false)
- map.add("a", 20)
+ map.add(sp("a"), 20)
assert(map.size == 1)
// Timestamp 5 should still considered a new file because purge time
should be 0
- assert(map.isNewFile("b", 9))
- assert(map.isNewFile("b", 10))
+ assert(map.isNewFile(sp("b"), 9))
+ assert(map.isNewFile(sp("b"), 10))
// Once purge, purge time should be 10 and then b would be a old file if
it is less than 10.
map.purge()
- assert(!map.isNewFile("b", 9))
- assert(map.isNewFile("b", 10))
+ assert(!map.isNewFile(sp("b"), 9))
+ assert(map.isNewFile(sp("b"), 10))
}
test("do not recheck that files exist during getBatch") {
@@ -2197,7 +2198,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val files =
metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
assert(files.forall(_.batchId == batchId))
- val actualInputFiles = files.map { p => new
Path(p.path).toUri.getPath }
+ val actualInputFiles = files.map { p => p.sparkPath.toUri.getPath }
val expectedInputFiles = inputFiles.slice(batchId.toInt * 10,
batchId.toInt * 10 + 10)
.map(_.getCanonicalPath)
assert(actualInputFiles === expectedInputFiles)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index aff014261ba..a9314397dcf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.hive.orc
-import java.net.URI
import java.util.Properties
import scala.collection.JavaConverters._
@@ -152,7 +151,7 @@ class OrcFileFormat extends FileFormat with
DataSourceRegister with Serializable
(file: PartitionedFile) => {
val conf = broadcastedHadoopConf.value.value
- val filePath = new Path(new URI(file.filePath))
+ val filePath = file.toPath
// SPARK-8501: Empty ORC files always have an empty schema stored in
their footer. In this
// case, `OrcFileOperator.readSchema` returns `None`, and we can't read
the underlying file
@@ -166,7 +165,7 @@ class OrcFileFormat extends FileFormat with
DataSourceRegister with Serializable
val orcRecordReader = {
val job = Job.getInstance(conf)
- FileInputFormat.setInputPaths(job, file.filePath)
+ FileInputFormat.setInputPaths(job, file.urlEncodedPath)
// Custom OrcRecordReader is used to get
// ObjectInspector during recordReader creation itself and can
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]