This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 223afea9960c [SPARK-46473][SQL] Reuse `getPartitionedFile` method
223afea9960c is described below
commit 223afea9960c7ef1a4c8654e043e860f6c248185
Author: huangxiaoping <[email protected]>
AuthorDate: Wed Jan 31 22:59:20 2024 -0600
[SPARK-46473][SQL] Reuse `getPartitionedFile` method
### What changes were proposed in this pull request?
Reuse `getPartitionedFile` method to reduce redundant code.
### Why are the changes needed?
Reduce redundant code.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44437 from huangxiaopingRD/SPARK-46473.
Authored-by: huangxiaoping <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
.../apache/spark/sql/execution/DataSourceScanExec.scala | 2 +-
.../apache/spark/sql/execution/PartitionedFileUtil.scala | 14 +++++++-------
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index b3b2b0eab055..2622eadaefb3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -645,7 +645,7 @@ case class FileSourceScanExec(
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val filesGroupedToBuckets =
selectedPartitions.flatMap { p =>
- p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, p.values))
+ p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, p.values,
0, f.getLen))
}.groupBy { f =>
BucketingUtils
.getBucketId(f.toPath.getName)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
index b31369b6768e..997859058de1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
@@ -33,20 +33,20 @@ object PartitionedFileUtil {
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
- val hosts = getBlockHosts(getBlockLocations(file.fileStatus), offset,
size)
- PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath),
offset, size, hosts,
- file.getModificationTime, file.getLen, file.metadata)
+ getPartitionedFile(file, partitionValues, offset, size)
}
} else {
- Seq(getPartitionedFile(file, partitionValues))
+ Seq(getPartitionedFile(file, partitionValues, 0, file.getLen))
}
}
def getPartitionedFile(
file: FileStatusWithMetadata,
- partitionValues: InternalRow): PartitionedFile = {
- val hosts = getBlockHosts(getBlockLocations(file.fileStatus), 0,
file.getLen)
- PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath), 0,
file.getLen, hosts,
+ partitionValues: InternalRow,
+ start: Long,
+ length: Long): PartitionedFile = {
+ val hosts = getBlockHosts(getBlockLocations(file.fileStatus), start,
length)
+ PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath), start,
length, hosts,
file.getModificationTime, file.getLen, file.metadata)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]