This is an automated email from the ASF dual-hosted git repository.

kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c144443443 Add config to support viewfs in Gluten. (#7892)
c144443443 is described below

commit c144443443580baf0f97dbfb721d66bc9cb21faa
Author: JiaKe <[email protected]>
AuthorDate: Mon Nov 18 09:01:19 2024 +0800

    Add config to support viewfs in Gluten. (#7892)
---
 .../backendsapi/clickhouse/CHIteratorApi.scala     |  4 +--
 .../GlutenClickHouseMergeTreeWriteOnS3Suite.scala  |  2 +-
 .../GlutenClickHouseMergeTreeWriteSuite.scala      |  6 ++--
 .../backendsapi/velox/VeloxIteratorApi.scala       | 20 +++---------
 .../gluten/execution/IcebergScanTransformer.scala  |  5 +--
 .../gluten/execution/VeloxIcebergSuite.scala       |  6 ++--
 .../gluten/substrait/rel/LocalFilesNode.java       |  5 +++
 .../apache/gluten/backendsapi/IteratorApi.scala    |  4 +--
 .../execution/BasicScanExecTransformer.scala       | 12 +++----
 .../gluten/execution/WholeStageTransformer.scala   | 37 +++++++++++++++++-----
 .../scala/org/apache/gluten/GlutenConfig.scala     |  9 ++++++
 11 files changed, 62 insertions(+), 48 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index dd5a736e75..ff268b95d8 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -43,7 +43,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
 
 import java.lang.{Long => JLong}
 import java.net.URI
@@ -133,8 +132,7 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
       partitionSchema: StructType,
       fileFormat: ReadFileFormat,
       metadataColumnNames: Seq[String],
-      properties: Map[String, String],
-      serializableHadoopConf: SerializableConfiguration): SplitInfo = {
+      properties: Map[String, String]): SplitInfo = {
     partition match {
       case p: GlutenMergeTreePartition =>
         ExtensionTableBuilder
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index 571dc4ba92..c0f509c68c 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -771,7 +771,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
             case scanExec: BasicScanExecTransformer => scanExec
           }
           assertResult(1)(plans.size)
-          assertResult(1)(plans.head.getSplitInfos(null).size)
+          assertResult(1)(plans.head.getSplitInfos.size)
       }
     }
   }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
index 85c8c2d92a..72adee309d 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -1807,7 +1807,7 @@ class GlutenClickHouseMergeTreeWriteSuite
                 case scanExec: BasicScanExecTransformer => scanExec
               }
               assertResult(1)(plans.size)
-              assertResult(conf._2)(plans.head.getSplitInfos(null).size)
+              assertResult(conf._2)(plans.head.getSplitInfos.size)
           }
         }
       })
@@ -1831,7 +1831,7 @@ class GlutenClickHouseMergeTreeWriteSuite
             case scanExec: BasicScanExecTransformer => scanExec
           }
           assertResult(1)(plans.size)
-          assertResult(1)(plans.head.getSplitInfos(null).size)
+          assertResult(1)(plans.head.getSplitInfos.size)
       }
     }
   }
@@ -1939,7 +1939,7 @@ class GlutenClickHouseMergeTreeWriteSuite
                 case f: BasicScanExecTransformer => f
               }
               assertResult(2)(scanExec.size)
-              assertResult(conf._2)(scanExec(1).getSplitInfos(null).size)
+              assertResult(conf._2)(scanExec(1).getSplitInfos.size)
           }
         }
       })
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 320d1f366c..061daaac0f 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -39,9 +39,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.{ExecutorManager, SerializableConfiguration, 
SparkDirectoryUtil}
-
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.spark.util.{ExecutorManager, SparkDirectoryUtil}
 
 import java.lang.{Long => JLong}
 import java.nio.charset.StandardCharsets
@@ -57,8 +55,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       partitionSchema: StructType,
       fileFormat: ReadFileFormat,
       metadataColumnNames: Seq[String],
-      properties: Map[String, String],
-      serializableHadoopConf: SerializableConfiguration): SplitInfo = {
+      properties: Map[String, String]): SplitInfo = {
     partition match {
       case f: FilePartition =>
         val (
@@ -69,7 +66,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
           modificationTimes,
           partitionColumns,
           metadataColumns) =
-          constructSplitInfo(partitionSchema, f.files, metadataColumnNames, 
serializableHadoopConf)
+          constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
         val preferredLocations =
           SoftAffinity.getFilePartitionLocations(f)
         LocalFilesBuilder.makeLocalFiles(
@@ -112,8 +109,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
   private def constructSplitInfo(
       schema: StructType,
       files: Array[PartitionedFile],
-      metadataColumnNames: Seq[String],
-      serializableHadoopConf: SerializableConfiguration) = {
+      metadataColumnNames: Seq[String]) = {
     val paths = new JArrayList[String]()
     val starts = new JArrayList[JLong]
     val lengths = new JArrayList[JLong]()
@@ -125,15 +121,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       file =>
         // The "file.filePath" in PartitionedFile is not the original encoded 
path, so the decoded
         // path is incorrect in some cases and here fix the case of ' ' by 
using GlutenURLDecoder
-        var filePath = file.filePath.toString
-        if (filePath.startsWith("viewfs")) {
-          val viewPath = new Path(filePath)
-          val viewFileSystem = FileSystem.get(viewPath.toUri, 
serializableHadoopConf.value)
-          filePath = viewFileSystem.resolvePath(viewPath).toString
-        }
         paths.add(
           GlutenURLDecoder
-            .decode(filePath, StandardCharsets.UTF_8.name()))
+            .decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
         starts.add(JLong.valueOf(file.start))
         lengths.add(JLong.valueOf(file.length))
         val (fileSize, modificationTime) =
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 10d24c317c..1cbeb52a92 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.read.{InputPartition, Scan}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
 
 import org.apache.iceberg.spark.source.GlutenIcebergSourceUtil
 
@@ -59,9 +58,7 @@ case class IcebergScanTransformer(
 
   override lazy val fileFormat: ReadFileFormat = 
GlutenIcebergSourceUtil.getFileFormat(scan)
 
-  override def getSplitInfosFromPartitions(
-      partitions: Seq[InputPartition],
-      serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
+  override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): 
Seq[SplitInfo] = {
     val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions(
       scan,
       keyGroupedPartitioning,
diff --git 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
index 7f399ce629..de71d341db 100644
--- 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
+++ 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala
@@ -128,7 +128,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
                 case plan if plan.isInstanceOf[IcebergScanTransformer] =>
                   assert(
                     
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 
3)
+                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
                 case _ => // do nothing
               }
               checkLengthAndPlan(df, 7)
@@ -208,7 +208,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
                 case plan if plan.isInstanceOf[IcebergScanTransformer] =>
                   assert(
                     
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 
3)
+                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
                 case _ => // do nothing
               }
               checkLengthAndPlan(df, 7)
@@ -289,7 +289,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
                 case plan if plan.isInstanceOf[IcebergScanTransformer] =>
                   assert(
                     
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 
1)
+                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1)
                 case _ => // do nothing
               }
               checkLengthAndPlan(df, 5)
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
index 04bb9d8cf4..9513f49760 100644
--- 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
@@ -92,6 +92,11 @@ public class LocalFilesNode implements SplitInfo {
     return paths;
   }
 
+  public void setPaths(List<String> newPaths) {
+    paths.clear();
+    paths.addAll(newPaths);
+  }
+
   public void setFileSchema(StructType schema) {
     this.fileSchema = schema;
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 69c9d37334..11211bd0da 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
 
 trait IteratorApi {
 
@@ -38,8 +37,7 @@ trait IteratorApi {
       partitionSchema: StructType,
       fileFormat: ReadFileFormat,
       metadataColumnNames: Seq[String],
-      properties: Map[String, String],
-      serializableHadoopConf: SerializableConfiguration): SplitInfo
+      properties: Map[String, String]): SplitInfo
 
   /** Generate native row partition. */
   def genPartitions(
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index d768ac2c59..73ed35e719 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.hive.HiveTableScanExecTransformer
 import org.apache.spark.sql.types.{BooleanType, StringType, StructField, 
StructType}
-import org.apache.spark.util.SerializableConfiguration
 
 import com.google.protobuf.StringValue
 import io.substrait.proto.NamedStruct
@@ -63,13 +62,11 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
   def getProperties: Map[String, String] = Map.empty
 
   /** Returns the split infos that will be processed by the underlying native 
engine. */
-  def getSplitInfos(serializableHadoopConf: SerializableConfiguration): 
Seq[SplitInfo] = {
-    getSplitInfosFromPartitions(getPartitions, serializableHadoopConf)
+  def getSplitInfos(): Seq[SplitInfo] = {
+    getSplitInfosFromPartitions(getPartitions)
   }
 
-  def getSplitInfosFromPartitions(
-      partitions: Seq[InputPartition],
-      serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
+  def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): 
Seq[SplitInfo] = {
     partitions.map(
       BackendsApiManager.getIteratorApiInstance
         .genSplitInfo(
@@ -77,8 +74,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
           getPartitionSchema,
           fileFormat,
           getMetadataColumns.map(_.name),
-          getProperties,
-          serializableHadoopConf))
+          getProperties))
   }
 
   override protected def doValidateInternal(): ValidationResult = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 70839ffc2e..beb7fe5f99 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -25,7 +25,7 @@ import org.apache.gluten.metrics.{GlutenTimeMetric, 
MetricsUpdater}
 import org.apache.gluten.substrait.`type`.{TypeBuilder, TypeNode}
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.plan.{PlanBuilder, PlanNode}
-import org.apache.gluten.substrait.rel.{RelNode, SplitInfo}
+import org.apache.gluten.substrait.rel.{LocalFilesNode, RelNode, SplitInfo}
 import org.apache.gluten.utils.SubstraitPlanPrinterUtil
 
 import org.apache.spark._
@@ -43,7 +43,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.SerializableConfiguration
 
 import com.google.common.collect.Lists
+import org.apache.hadoop.fs.{FileSystem, Path}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
@@ -127,8 +129,10 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
     
BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext)
 
   val sparkConf: SparkConf = sparkContext.getConf
+
   val serializableHadoopConf: SerializableConfiguration = new 
SerializableConfiguration(
     sparkContext.hadoopConfiguration)
+
   val numaBindingInfo: GlutenNumaBindingInfo = 
GlutenConfig.getConf.numaBindingInfo
 
   @transient
@@ -289,10 +293,28 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
        */
       val allScanPartitions = basicScanExecTransformers.map(_.getPartitions)
       val allScanSplitInfos =
-        getSplitInfosFromPartitions(
-          basicScanExecTransformers,
-          allScanPartitions,
-          serializableHadoopConf)
+        getSplitInfosFromPartitions(basicScanExecTransformers, 
allScanPartitions)
+      if (GlutenConfig.getConf.enableHdfsViewfs) {
+        allScanSplitInfos.foreach {
+          splitInfos =>
+            splitInfos.foreach {
+              case splitInfo: LocalFilesNode =>
+                val paths = splitInfo.getPaths.asScala
+                if (paths.nonEmpty && paths.head.startsWith("viewfs")) {
+                  // Convert the viewfs path into hdfs
+                  val newPaths = paths.map {
+                    viewfsPath =>
+                      val viewPath = new Path(viewfsPath)
+                      val viewFileSystem =
+                        FileSystem.get(viewPath.toUri, 
serializableHadoopConf.value)
+                      viewFileSystem.resolvePath(viewPath).toString
+                  }
+                  splitInfo.setPaths(newPaths.asJava)
+                }
+            }
+        }
+      }
+
       val inputPartitions =
         BackendsApiManager.getIteratorApiInstance.genPartitions(
           wsCtx,
@@ -384,8 +406,7 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
 
   private def getSplitInfosFromPartitions(
       basicScanExecTransformers: Seq[BasicScanExecTransformer],
-      allScanPartitions: Seq[Seq[InputPartition]],
-      serializableHadoopConf: SerializableConfiguration): Seq[Seq[SplitInfo]] 
= {
+      allScanPartitions: Seq[Seq[InputPartition]]): Seq[Seq[SplitInfo]] = {
     // If these are two scan transformers, they must have same partitions,
     // otherwise, exchange will be inserted. We should combine the two scan
     // transformers' partitions with same index, and set them together in
@@ -404,7 +425,7 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
     val allScanSplitInfos =
       allScanPartitions.zip(basicScanExecTransformers).map {
         case (partition, transformer) =>
-          transformer.getSplitInfosFromPartitions(partition, 
serializableHadoopConf)
+          transformer.getSplitInfosFromPartitions(partition)
       }
     val partitionLength = allScanSplitInfos.head.size
     if (allScanSplitInfos.exists(_.size != partitionLength)) {
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 5b15faf646..107c33a241 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -478,6 +478,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {
   def enableHiveFileFormatWriter: Boolean = 
conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
 
   def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED)
+
+  def enableHdfsViewfs: Boolean = conf.getConf(HDFS_VIEWFS_ENABLED)
 }
 
 object GlutenConfig {
@@ -2193,4 +2195,11 @@ object GlutenConfig {
           "Otherwise, do nothing.")
       .booleanConf
       .createWithDefault(false)
+
+  val HDFS_VIEWFS_ENABLED =
+    buildStaticConf("spark.gluten.storage.hdfsViewfs.enabled")
+      .internal()
+      .doc("If enabled, gluten will convert the viewfs path to hdfs path in 
scala side")
+      .booleanConf
+      .createWithDefault(false)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to