This is an automated email from the ASF dual-hosted git repository.
kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c144443443 Add config to support viewfs in Gluten. (#7892)
c144443443 is described below
commit c144443443580baf0f97dbfb721d66bc9cb21faa
Author: JiaKe <[email protected]>
AuthorDate: Mon Nov 18 09:01:19 2024 +0800
Add config to support viewfs in Gluten. (#7892)
---
.../backendsapi/clickhouse/CHIteratorApi.scala | 4 +--
.../GlutenClickHouseMergeTreeWriteOnS3Suite.scala | 2 +-
.../GlutenClickHouseMergeTreeWriteSuite.scala | 6 ++--
.../backendsapi/velox/VeloxIteratorApi.scala | 20 +++---------
.../gluten/execution/IcebergScanTransformer.scala | 5 +--
.../gluten/execution/VeloxIcebergSuite.scala | 6 ++--
.../gluten/substrait/rel/LocalFilesNode.java | 5 +++
.../apache/gluten/backendsapi/IteratorApi.scala | 4 +--
.../execution/BasicScanExecTransformer.scala | 12 +++----
.../gluten/execution/WholeStageTransformer.scala | 37 +++++++++++++++++-----
.../scala/org/apache/gluten/GlutenConfig.scala | 9 ++++++
11 files changed, 62 insertions(+), 48 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index dd5a736e75..ff268b95d8 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -43,7 +43,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
import java.lang.{Long => JLong}
import java.net.URI
@@ -133,8 +132,7 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
- properties: Map[String, String],
- serializableHadoopConf: SerializableConfiguration): SplitInfo = {
+ properties: Map[String, String]): SplitInfo = {
partition match {
case p: GlutenMergeTreePartition =>
ExtensionTableBuilder
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index 571dc4ba92..c0f509c68c 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -771,7 +771,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
- assertResult(1)(plans.head.getSplitInfos(null).size)
+ assertResult(1)(plans.head.getSplitInfos.size)
}
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
index 85c8c2d92a..72adee309d 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -1807,7 +1807,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
- assertResult(conf._2)(plans.head.getSplitInfos(null).size)
+ assertResult(conf._2)(plans.head.getSplitInfos.size)
}
}
})
@@ -1831,7 +1831,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
- assertResult(1)(plans.head.getSplitInfos(null).size)
+ assertResult(1)(plans.head.getSplitInfos.size)
}
}
}
@@ -1939,7 +1939,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case f: BasicScanExecTransformer => f
}
assertResult(2)(scanExec.size)
- assertResult(conf._2)(scanExec(1).getSplitInfos(null).size)
+ assertResult(conf._2)(scanExec(1).getSplitInfos.size)
}
}
})
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 320d1f366c..061daaac0f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -39,9 +39,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.{ExecutorManager, SerializableConfiguration,
SparkDirectoryUtil}
-
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.spark.util.{ExecutorManager, SparkDirectoryUtil}
import java.lang.{Long => JLong}
import java.nio.charset.StandardCharsets
@@ -57,8 +55,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
- properties: Map[String, String],
- serializableHadoopConf: SerializableConfiguration): SplitInfo = {
+ properties: Map[String, String]): SplitInfo = {
partition match {
case f: FilePartition =>
val (
@@ -69,7 +66,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
modificationTimes,
partitionColumns,
metadataColumns) =
- constructSplitInfo(partitionSchema, f.files, metadataColumnNames,
serializableHadoopConf)
+ constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
val preferredLocations =
SoftAffinity.getFilePartitionLocations(f)
LocalFilesBuilder.makeLocalFiles(
@@ -112,8 +109,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
private def constructSplitInfo(
schema: StructType,
files: Array[PartitionedFile],
- metadataColumnNames: Seq[String],
- serializableHadoopConf: SerializableConfiguration) = {
+ metadataColumnNames: Seq[String]) = {
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]
val lengths = new JArrayList[JLong]()
@@ -125,15 +121,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
file =>
// The "file.filePath" in PartitionedFile is not the original encoded
path, so the decoded
// path is incorrect in some cases and here fix the case of ' ' by
using GlutenURLDecoder
- var filePath = file.filePath.toString
- if (filePath.startsWith("viewfs")) {
- val viewPath = new Path(filePath)
- val viewFileSystem = FileSystem.get(viewPath.toUri,
serializableHadoopConf.value)
- filePath = viewFileSystem.resolvePath(viewPath).toString
- }
paths.add(
GlutenURLDecoder
- .decode(filePath, StandardCharsets.UTF_8.name()))
+ .decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
starts.add(JLong.valueOf(file.start))
lengths.add(JLong.valueOf(file.length))
val (fileSize, modificationTime) =
diff --git
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 10d24c317c..1cbeb52a92 100644
---
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
import org.apache.iceberg.spark.source.GlutenIcebergSourceUtil
@@ -59,9 +58,7 @@ case class IcebergScanTransformer(
override lazy val fileFormat: ReadFileFormat =
GlutenIcebergSourceUtil.getFileFormat(scan)
- override def getSplitInfosFromPartitions(
- partitions: Seq[InputPartition],
- serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
+ override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]):
Seq[SplitInfo] = {
val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions(
scan,
keyGroupedPartitioning,
diff --git
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
index 7f399ce629..de71d341db 100644
---
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
+++
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
@@ -128,7 +128,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length ==
3)
+
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
case _ => // do nothing
}
checkLengthAndPlan(df, 7)
@@ -208,7 +208,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length ==
3)
+
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
case _ => // do nothing
}
checkLengthAndPlan(df, 7)
@@ -289,7 +289,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length ==
1)
+
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1)
case _ => // do nothing
}
checkLengthAndPlan(df, 5)
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
index 04bb9d8cf4..9513f49760 100644
---
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
@@ -92,6 +92,11 @@ public class LocalFilesNode implements SplitInfo {
return paths;
}
+ public void setPaths(List<String> newPaths) {
+ paths.clear();
+ paths.addAll(newPaths);
+ }
+
public void setFileSchema(StructType schema) {
this.fileSchema = schema;
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 69c9d37334..11211bd0da 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
trait IteratorApi {
@@ -38,8 +37,7 @@ trait IteratorApi {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
- properties: Map[String, String],
- serializableHadoopConf: SerializableConfiguration): SplitInfo
+ properties: Map[String, String]): SplitInfo
/** Generate native row partition. */
def genPartitions(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index d768ac2c59..73ed35e719 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{BooleanType, StringType, StructField,
StructType}
-import org.apache.spark.util.SerializableConfiguration
import com.google.protobuf.StringValue
import io.substrait.proto.NamedStruct
@@ -63,13 +62,11 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
def getProperties: Map[String, String] = Map.empty
/** Returns the split infos that will be processed by the underlying native
engine. */
- def getSplitInfos(serializableHadoopConf: SerializableConfiguration):
Seq[SplitInfo] = {
- getSplitInfosFromPartitions(getPartitions, serializableHadoopConf)
+ def getSplitInfos(): Seq[SplitInfo] = {
+ getSplitInfosFromPartitions(getPartitions)
}
- def getSplitInfosFromPartitions(
- partitions: Seq[InputPartition],
- serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
+ def getSplitInfosFromPartitions(partitions: Seq[InputPartition]):
Seq[SplitInfo] = {
partitions.map(
BackendsApiManager.getIteratorApiInstance
.genSplitInfo(
@@ -77,8 +74,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
getPartitionSchema,
fileFormat,
getMetadataColumns.map(_.name),
- getProperties,
- serializableHadoopConf))
+ getProperties))
}
override protected def doValidateInternal(): ValidationResult = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 70839ffc2e..beb7fe5f99 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -25,7 +25,7 @@ import org.apache.gluten.metrics.{GlutenTimeMetric,
MetricsUpdater}
import org.apache.gluten.substrait.`type`.{TypeBuilder, TypeNode}
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.plan.{PlanBuilder, PlanNode}
-import org.apache.gluten.substrait.rel.{RelNode, SplitInfo}
+import org.apache.gluten.substrait.rel.{LocalFilesNode, RelNode, SplitInfo}
import org.apache.gluten.utils.SubstraitPlanPrinterUtil
import org.apache.spark._
@@ -43,7 +43,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration
import com.google.common.collect.Lists
+import org.apache.hadoop.fs.{FileSystem, Path}
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -127,8 +129,10 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext)
val sparkConf: SparkConf = sparkContext.getConf
+
val serializableHadoopConf: SerializableConfiguration = new
SerializableConfiguration(
sparkContext.hadoopConfiguration)
+
val numaBindingInfo: GlutenNumaBindingInfo =
GlutenConfig.getConf.numaBindingInfo
@transient
@@ -289,10 +293,28 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
*/
val allScanPartitions = basicScanExecTransformers.map(_.getPartitions)
val allScanSplitInfos =
- getSplitInfosFromPartitions(
- basicScanExecTransformers,
- allScanPartitions,
- serializableHadoopConf)
+ getSplitInfosFromPartitions(basicScanExecTransformers,
allScanPartitions)
+ if (GlutenConfig.getConf.enableHdfsViewfs) {
+ allScanSplitInfos.foreach {
+ splitInfos =>
+ splitInfos.foreach {
+ case splitInfo: LocalFilesNode =>
+ val paths = splitInfo.getPaths.asScala
+ if (paths.nonEmpty && paths.head.startsWith("viewfs")) {
+ // Convert the viewfs path into hdfs
+ val newPaths = paths.map {
+ viewfsPath =>
+ val viewPath = new Path(viewfsPath)
+ val viewFileSystem =
+ FileSystem.get(viewPath.toUri,
serializableHadoopConf.value)
+ viewFileSystem.resolvePath(viewPath).toString
+ }
+ splitInfo.setPaths(newPaths.asJava)
+ }
+ }
+ }
+ }
+
val inputPartitions =
BackendsApiManager.getIteratorApiInstance.genPartitions(
wsCtx,
@@ -384,8 +406,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
private def getSplitInfosFromPartitions(
basicScanExecTransformers: Seq[BasicScanExecTransformer],
- allScanPartitions: Seq[Seq[InputPartition]],
- serializableHadoopConf: SerializableConfiguration): Seq[Seq[SplitInfo]]
= {
+ allScanPartitions: Seq[Seq[InputPartition]]): Seq[Seq[SplitInfo]] = {
// If these are two scan transformers, they must have same partitions,
// otherwise, exchange will be inserted. We should combine the two scan
// transformers' partitions with same index, and set them together in
@@ -404,7 +425,7 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
val allScanSplitInfos =
allScanPartitions.zip(basicScanExecTransformers).map {
case (partition, transformer) =>
- transformer.getSplitInfosFromPartitions(partition,
serializableHadoopConf)
+ transformer.getSplitInfosFromPartitions(partition)
}
val partitionLength = allScanSplitInfos.head.size
if (allScanSplitInfos.exists(_.size != partitionLength)) {
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 5b15faf646..107c33a241 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -478,6 +478,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def enableHiveFileFormatWriter: Boolean =
conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED)
+
+ def enableHdfsViewfs: Boolean = conf.getConf(HDFS_VIEWFS_ENABLED)
}
object GlutenConfig {
@@ -2193,4 +2195,11 @@ object GlutenConfig {
"Otherwise, do nothing.")
.booleanConf
.createWithDefault(false)
+
+ val HDFS_VIEWFS_ENABLED =
+ buildStaticConf("spark.gluten.storage.hdfsViewfs.enabled")
+ .internal()
+ .doc("If enabled, gluten will convert the viewfs path to hdfs path in
scala side")
+ .booleanConf
+ .createWithDefault(false)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]