This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 93c066a19d [GLUTEN-11050] Regenerate input partitions for small files
(#11051)
93c066a19d is described below
commit 93c066a19ddff61e8929662b88d32d0bf405a72a
Author: Rong Ma <[email protected]>
AuthorDate: Thu Nov 13 13:14:46 2025 +0000
[GLUTEN-11050] Regenerate input partitions for small files (#11051)
---
docs/Configuration.md | 1 +
.../org/apache/gluten/config/GlutenConfig.scala | 12 +++
.../org/apache/gluten/utils/PartitionsUtil.scala | 98 +++++++++++++++++++-
.../apache/gluten/utils/PartitionsUtilSuite.scala | 102 +++++++++++++++++++++
.../apache/spark/sql/GlutenSQLQueryTestSuite.scala | 1 +
.../apache/spark/sql/GlutenSQLQueryTestSuite.scala | 1 +
.../apache/spark/sql/GlutenSQLQueryTestSuite.scala | 1 +
.../apache/spark/sql/GlutenSQLQueryTestSuite.scala | 1 +
.../gluten/execution/PartitionedFileUtilShim.scala | 26 ++++++
.../gluten/execution/PartitionedFileUtilShim.scala | 26 ++++++
.../gluten/execution/PartitionedFileUtilShim.scala | 27 ++++++
.../gluten/execution/PartitionedFileUtilShim.scala | 5 +
.../gluten/execution/PartitionedFileUtilShim.scala | 5 +
13 files changed, 304 insertions(+), 2 deletions(-)
diff --git a/docs/Configuration.md b/docs/Configuration.md
index c3d0c396fd..ec36344365 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -102,6 +102,7 @@ nav_order: 15
| spark.gluten.sql.columnar.shuffle.sort.partitions.threshold | 4000
| The threshold to determine whether to use sort-based columnar
shuffle. Sort-based shuffle will be used if the number of partitions is greater
than this threshold.
|
| spark.gluten.sql.columnar.shuffledHashJoin | true
| Enable or disable columnar shuffledHashJoin.
|
| spark.gluten.sql.columnar.shuffledHashJoin.optimizeBuildSide | true
| Whether to allow Gluten to choose an optimal build side for
shuffled hash join.
|
+| spark.gluten.sql.columnar.smallFileThreshold | 0.5
| The total size threshold of small files in table scan.To avoid
small files being placed into the same partition, Gluten will try to distribute
small files into different partitions when the total size of small files is
below this threshold.
|
| spark.gluten.sql.columnar.sort | true
| Enable or disable columnar sort.
|
| spark.gluten.sql.columnar.sortMergeJoin | true
| Enable or disable columnar sortMergeJoin. This should be set with
preferSortMergeJoin=false.
|
| spark.gluten.sql.columnar.tableCache | false
| Enable or disable columnar table cache.
|
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index beb630c0ac..b8a37d8478 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -289,6 +289,8 @@ class GlutenConfig(conf: SQLConf) extends
GlutenCoreConfig(conf) {
def extendedExpressionTransformer: String =
getConf(EXTENDED_EXPRESSION_TRAN_CONF)
+ def smallFileThreshold: Double = getConf(SMALL_FILE_THRESHOLD)
+
def expressionBlacklist: Set[String] = {
val blacklistSet = getConf(EXPRESSION_BLACK_LIST)
.map(_.toLowerCase(Locale.ROOT).split(",").map(_.trim()).filter(_.nonEmpty).toSet)
@@ -1582,4 +1584,14 @@ object GlutenConfig extends ConfigRegistry {
.doc("Enable or disable columnar collectTail.")
.booleanConf
.createWithDefault(true)
+
+ val SMALL_FILE_THRESHOLD =
+ buildConf("spark.gluten.sql.columnar.smallFileThreshold")
+ .doc(
+ "The total size threshold of small files in table scan." +
+ "To avoid small files being placed into the same partition, " +
+ "Gluten will try to distribute small files into different partitions
when the " +
+ "total size of small files is below this threshold.")
+ .doubleConf
+ .createWithDefault(0.5)
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
index 953f95bf23..0f9c5deb23 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
@@ -16,17 +16,21 @@
*/
package org.apache.gluten.utils
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.utils.PartitionsUtil.regeneratePartition
import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.datasources.{BucketingUtils,
FilePartition, HadoopFsRelation, PartitionDirectory}
+import org.apache.spark.sql.execution.datasources.{BucketingUtils,
FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet
import org.apache.hadoop.fs.Path
+import scala.collection.mutable
+
case class PartitionsUtil(
relation: HadoopFsRelation,
requiredSchema: StructType,
@@ -96,7 +100,10 @@ case class PartitionsUtil(
}
.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
- FilePartition.getFilePartitions(relation.sparkSession, splitFiles,
maxSplitBytes)
+ val inputPartitions =
+ FilePartition.getFilePartitions(relation.sparkSession, splitFiles,
maxSplitBytes)
+
+ regeneratePartition(inputPartitions, GlutenConfig.get.smallFileThreshold)
}
private def genBucketedPartitionSeq(): Seq[Partition] = {
@@ -140,3 +147,90 @@ case class PartitionsUtil(
output.find(_.name == colName)
}
}
+
+object PartitionsUtil {
+
+ /**
+ * Regenerate the partitions by balancing the number of files per partition
and total size per
+ * partition.
+ */
+ def regeneratePartition(
+ inputPartitions: Seq[FilePartition],
+ smallFileThreshold: Double): Seq[FilePartition] = {
+
+ // Flatten and sort descending by file size.
+ val filesSorted: Seq[(PartitionedFile, Long)] =
+ inputPartitions
+ .flatMap(_.files)
+ .map(f => (f, f.length))
+ .sortBy(_._2)(Ordering.Long.reverse)
+
+ val partitions =
Array.fill(inputPartitions.size)(mutable.ArrayBuffer.empty[PartitionedFile])
+
+ def addToBucket(
+ heap: mutable.PriorityQueue[(Long, Int, Int)],
+ file: PartitionedFile,
+ sz: Long): Unit = {
+ val (load, numFiles, idx) = heap.dequeue()
+ partitions(idx) += file
+ heap.enqueue((load + sz, numFiles + 1, idx))
+ }
+
+ // First by load, then by numFiles.
+ val heapByFileSize =
+ mutable.PriorityQueue.empty[(Long, Int, Int)](
+ Ordering
+ .by[(Long, Int, Int), (Long, Int)] {
+ case (load, numFiles, _) =>
+ (load, numFiles)
+ }
+ .reverse
+ )
+
+ if (smallFileThreshold > 0) {
+ val smallFileTotalSize = filesSorted.map(_._2).sum * smallFileThreshold
+ // First by numFiles, then by load.
+ val heapByFileNum =
+ mutable.PriorityQueue.empty[(Long, Int, Int)](
+ Ordering
+ .by[(Long, Int, Int), (Int, Long)] {
+ case (load, numFiles, _) =>
+ (numFiles, load)
+ }
+ .reverse
+ )
+
+ inputPartitions.indices.foreach(i => heapByFileNum.enqueue((0L, 0, i)))
+
+ var numSmallFiles = 0
+ var smallFileSize = 0L
+ // Enqueue small files to the least number of files and the least load.
+ filesSorted.reverse.takeWhile(f => f._2 + smallFileSize <=
smallFileTotalSize).foreach {
+ case (file, sz) =>
+ addToBucket(heapByFileNum, file, sz)
+ numSmallFiles += 1
+ smallFileSize += sz
+ }
+
+ // Move buckets from heapByFileNum to heapByFileSize.
+ while (heapByFileNum.nonEmpty) {
+ heapByFileSize.enqueue(heapByFileNum.dequeue())
+ }
+
+ // Finally, enqueue remaining files.
+ filesSorted.take(filesSorted.size - numSmallFiles).foreach {
+ case (file, sz) =>
+ addToBucket(heapByFileSize, file, sz)
+ }
+ } else {
+ inputPartitions.indices.foreach(i => heapByFileSize.enqueue((0L, 0, i)))
+
+ filesSorted.foreach {
+ case (file, sz) =>
+ addToBucket(heapByFileSize, file, sz)
+ }
+ }
+
+ partitions.zipWithIndex.map { case (p, idx) => FilePartition(idx,
p.toArray) }
+ }
+}
diff --git
a/gluten-substrait/src/test/scala/org/apache/gluten/utils/PartitionsUtilSuite.scala
b/gluten-substrait/src/test/scala/org/apache/gluten/utils/PartitionsUtilSuite.scala
new file mode 100644
index 0000000000..0a9719ab74
--- /dev/null
+++
b/gluten-substrait/src/test/scala/org/apache/gluten/utils/PartitionsUtilSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.gluten.execution.PartitionedFileUtilShim
+
+import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile}
+
+import org.scalatest.funsuite.AnyFunSuite
+
+class PartitionsUtilSuite extends AnyFunSuite {
+
+ private def makePartitionedFile(path: String, length: Long): PartitionedFile
=
+ PartitionedFileUtilShim.makePartitionedFileFromPath(path, length)
+
+ private def makeFilePartitions(
+ files: Seq[PartitionedFile],
+ numPartitions: Int): Seq[FilePartition] = {
+ val numGroups = files.size / numPartitions +
+ (if (files.size % numPartitions == 0) 0 else 1)
+ files.grouped(numGroups).toSeq.zipWithIndex.map {
+ case (p, idx) => FilePartition(idx, p.toArray)
+ }
+ }
+
+ test("large files are distributed evenly by size") {
+ val files = Seq(
+ makePartitionedFile("f1", 100),
+ makePartitionedFile("f2", 90),
+ makePartitionedFile("f3", 80),
+ makePartitionedFile("f4", 70)
+ )
+ val initialPartitions = makeFilePartitions(files, 2)
+
+ val result = PartitionsUtil.regeneratePartition(initialPartitions, 0.0)
+
+ assert(result.size === 2)
+
+ val sizes = result.map(_.files.map(_.length).sum)
+ assert(sizes.forall(_ === 170))
+ }
+
+ test("small files are distributed evenly by number of files") {
+ val files = (1 to 10).map(i => makePartitionedFile(s"f$i", 10))
+ val initialPartitions = makeFilePartitions(files, 5)
+
+ val result = PartitionsUtil.regeneratePartition(initialPartitions, 1.0)
+
+ assert(result.size === 5)
+ val counts = result.map(_.files.length)
+ assert(counts.forall(_ === 2))
+ }
+
+ test("small files should not be placed into one partition") {
+ val files = Seq(
+ makePartitionedFile("f1", 10),
+ makePartitionedFile("f2", 20),
+ makePartitionedFile("f3", 30),
+ makePartitionedFile("f4", 40),
+ makePartitionedFile("f5", 100)
+ )
+ val initialPartitions = makeFilePartitions(files, 2)
+
+ // Only "f5" is not small file.
+ val result = PartitionsUtil.regeneratePartition(initialPartitions, 0.5)
+
+ assert(result.size === 2)
+ assert(result.forall(_.files.exists(_.length <= 40)))
+ }
+
+ test("zero length files") {
+ val files = Seq(
+ makePartitionedFile("f1", 0),
+ makePartitionedFile("f2", 0)
+ )
+ val initialPartitions = makeFilePartitions(files, 2)
+
+ val result = PartitionsUtil.regeneratePartition(initialPartitions, 0.0)
+
+ assert(result.size === 2)
+ assert(result.count(_.files.nonEmpty) === 2)
+ }
+
+ test("empty inputs") {
+ val result = PartitionsUtil.regeneratePartition(Seq.empty, 0.5)
+ assert(result.size === 0)
+ }
+}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
index 0e0c0a70d5..084fc83c31 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
@@ -189,6 +189,7 @@ class GlutenSQLQueryTestSuite
.set("spark.memory.offHeap.size", "1024MB")
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set(GlutenConfig.SMALL_FILE_THRESHOLD.key, "0")
if (isCHBackend) {
conf
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
index c55b744dc1..25e4773e9d 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
@@ -189,6 +189,7 @@ class GlutenSQLQueryTestSuite
.set("spark.memory.offHeap.size", "1024MB")
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set(GlutenConfig.SMALL_FILE_THRESHOLD.key, "0")
if (isCHBackend) {
conf
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
index fddfe83fb5..cfc4f38ec0 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
@@ -193,6 +193,7 @@ class GlutenSQLQueryTestSuite
.set("spark.memory.offHeap.size", "1024MB")
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set(GlutenConfig.SMALL_FILE_THRESHOLD.key, "0")
if (isCHBackend) {
conf
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
index 6056ea2ac9..2ee9ecdaaf 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenSQLQueryTestSuite.scala
@@ -116,6 +116,7 @@ class GlutenSQLQueryTestSuite
.set("spark.memory.offHeap.size", "1024MB")
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set(GlutenConfig.SMALL_FILE_THRESHOLD.key, "0")
if (isCHBackend) {
conf
diff --git
a/shims/spark32/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
b/shims/spark32/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
new file mode 100644
index 0000000000..a6fdd6de2f
--- /dev/null
+++
b/shims/spark32/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+
+object PartitionedFileUtilShim {
+ // Helper method to create PartitionedFile from path and length.
+ def makePartitionedFileFromPath(path: String, length: Long): PartitionedFile
= {
+ PartitionedFile(null, path, 0, length, Array.empty)
+ }
+}
diff --git
a/shims/spark33/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
b/shims/spark33/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
new file mode 100644
index 0000000000..a6fdd6de2f
--- /dev/null
+++
b/shims/spark33/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+
+object PartitionedFileUtilShim {
+ // Helper method to create PartitionedFile from path and length.
+ def makePartitionedFileFromPath(path: String, length: Long): PartitionedFile
= {
+ PartitionedFile(null, path, 0, length, Array.empty)
+ }
+}
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
b/shims/spark34/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
new file mode 100644
index 0000000000..cebefc2c4b
--- /dev/null
+++
b/shims/spark34/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.paths.SparkPath
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+
+object PartitionedFileUtilShim {
+ // Helper method to create PartitionedFile from path and length.
+ def makePartitionedFileFromPath(path: String, length: Long): PartitionedFile
= {
+ PartitionedFile(null, SparkPath.fromPathString(path), 0, length,
Array.empty)
+ }
+}
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
b/shims/spark35/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
index 35fbf6338b..67f4058303 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution
+import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.PartitionedFileUtil
@@ -148,4 +149,8 @@ object PartitionedFileUtilShim {
}
}
+ // Helper method to create PartitionedFile from path and length.
+ def makePartitionedFileFromPath(path: String, length: Long): PartitionedFile
= {
+ PartitionedFile(null, SparkPath.fromPathString(path), 0, length,
Array.empty)
+ }
}
diff --git
a/shims/spark40/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
b/shims/spark40/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
index 838932add7..8c7b45ae2c 100644
---
a/shims/spark40/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
+++
b/shims/spark40/src/main/scala/org/apache/gluten/execution/PartitionedFileUtilShim.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution
+import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.PartitionedFileUtil
@@ -146,4 +147,8 @@ object PartitionedFileUtilShim {
}
}
+ // Helper method to create PartitionedFile from path and length.
+ def makePartitionedFileFromPath(path: String, length: Long): PartitionedFile
= {
+ PartitionedFile(null, SparkPath.fromPathString(path), 0, length,
Array.empty)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]