This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ba3318e0d [VL] Fall back scan if file scheme is not supported by 
registered file systems (#6672)
ba3318e0d is described below

commit ba3318e0d2fd29e589bb7a0a705b3061065755d9
Author: Zhen Li <[email protected]>
AuthorDate: Mon Aug 12 11:11:08 2024 +0800

    [VL] Fall back scan if file scheme is not supported by registered file 
systems (#6672)
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |  3 +-
 .../utils/VeloxFileSystemValidationJniWrapper.java | 19 ++--------
 .../gluten/backendsapi/velox/VeloxBackend.scala    | 25 +++++++++++-
 .../apache/gluten/execution/VeloxScanSuite.scala   | 44 ++++++++++++++++++++++
 cpp/velox/jni/VeloxJniWrapper.cc                   | 19 ++++++++++
 .../gluten/backendsapi/BackendSettingsApi.scala    |  3 +-
 .../apache/gluten/execution/BaseDataSource.scala   |  2 +
 .../execution/BasicScanExecTransformer.scala       | 16 +++++++-
 .../execution/BatchScanExecTransformer.scala       |  9 +++++
 .../execution/FileSourceScanExecTransformer.scala  |  5 +++
 .../FileIndexUtil.scala}                           | 25 +++++-------
 .../sql/hive/HiveTableScanExecTransformer.scala    |  3 ++
 .../gluten/execution/IcebergScanTransformer.scala  |  3 ++
 .../scala/org/apache/gluten/GlutenConfig.scala     | 13 +++++++
 14 files changed, 153 insertions(+), 36 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 265750c2c..a115ca712 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -140,10 +140,11 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
       .toLowerCase(Locale.getDefault)
   }
 
-  override def supportFileFormatRead(
+  override def validateScan(
       format: ReadFileFormat,
       fields: Array[StructField],
       partTable: Boolean,
+      rootPaths: Seq[String],
       paths: Seq[String]): ValidationResult = {
 
     def validateFilePath: Boolean = {
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala 
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxFileSystemValidationJniWrapper.java
similarity index 56%
copy from 
gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
copy to 
backends-velox/src/main/java/org/apache/gluten/utils/VeloxFileSystemValidationJniWrapper.java
index b34abd490..8e9bd3f03 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
+++ 
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxFileSystemValidationJniWrapper.java
@@ -14,22 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.execution
+package org.apache.gluten.utils;
 
-import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.types.StructType
+public class VeloxFileSystemValidationJniWrapper {
 
-trait BaseDataSource {
-
-  /** Returns the actual schema of this data source scan. */
-  def getDataSchema: StructType
-
-  /** Returns the required partition schema, used to generate partition 
column. */
-  def getPartitionSchema: StructType
-
-  /** Returns the partitions generated by this data source scan. */
-  def getPartitions: Seq[InputPartition]
-
-  /** Returns the input file paths, used to validate the partition column path 
*/
-  def getInputFilePathsInternal: Seq[String]
+  public static native boolean allSupportedByRegisteredFileSystems(String[] 
paths);
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 33efdbc5e..d32911f4a 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -25,6 +25,7 @@ import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, 
OrcReadFormat, ParquetReadFormat}
+import org.apache.gluten.utils._
 
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, 
Descending, EulerNumber, Expression, Lag, Lead, Literal, MakeYMInterval, 
NamedExpression, NthValue, NTile, PercentRank, Pi, Rand, RangeFrame, Rank, 
RowNumber, SortOrder, SparkPartitionID, SparkVersion, SpecialFrameBoundary, 
SpecifiedWindowFrame, Uuid}
@@ -40,6 +41,8 @@ import org.apache.spark.sql.hive.execution.HiveFileFormat
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
+import org.apache.hadoop.fs.Path
+
 import scala.util.control.Breaks.breakable
 
 class VeloxBackend extends Backend {
@@ -70,11 +73,20 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   val MAXIMUM_BATCH_SIZE: Int = 32768
 
-  override def supportFileFormatRead(
+  override def validateScan(
       format: ReadFileFormat,
       fields: Array[StructField],
       partTable: Boolean,
+      rootPaths: Seq[String],
       paths: Seq[String]): ValidationResult = {
+    val filteredRootPaths = distinctRootPaths(rootPaths)
+    if (
+      !filteredRootPaths.isEmpty && !VeloxFileSystemValidationJniWrapper
+        .allSupportedByRegisteredFileSystems(filteredRootPaths.toArray)
+    ) {
+      return ValidationResult.failed(
+        s"Scheme of [$filteredRootPaths] is not supported by registered file 
systems.")
+    }
     // Validate if all types are supported.
     def validateTypes(validatorFunc: PartialFunction[StructField, String]): 
ValidationResult = {
       // Collect unsupported types.
@@ -179,6 +191,17 @@ object VeloxBackendSettings extends BackendSettingsApi {
       .isDefined
   }
 
+  def distinctRootPaths(paths: Seq[String]): Seq[String] = {
+    // Skip native validation for local path, as local file system is always 
registered.
+    // For evey file scheme, only one path is kept.
+    paths
+      .map(p => (new Path(p).toUri.getScheme, p))
+      .groupBy(_._1)
+      .filter(_._1 != "file")
+      .map(_._2.head._2)
+      .toSeq
+  }
+
   override def supportWriteFilesExec(
       format: FileFormat,
       fields: Array[StructField],
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala
index 688cca699..852a1f4fb 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala
@@ -16,6 +16,10 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.velox.VeloxBackendSettings
+import org.apache.gluten.utils.VeloxFileSystemValidationJniWrapper
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.expressions.GreaterThan
 import org.apache.spark.sql.execution.ScalarSubquery
@@ -74,4 +78,44 @@ class VeloxScanSuite extends VeloxWholeStageTransformerSuite 
{
       }
     }
   }
+
+  test("Test file scheme validation") {
+    withTempPath {
+      path =>
+        withSQLConf(GlutenConfig.NATIVE_WRITER_ENABLED.key -> "false") {
+          spark
+            .range(100)
+            .selectExpr("cast(id % 9 as int) as c1")
+            .write
+            .format("parquet")
+            .save(path.getCanonicalPath)
+          runQueryAndCompare(s"SELECT count(*) FROM 
`parquet`.`${path.getCanonicalPath}`") {
+            df =>
+              val plan = df.queryExecution.executedPlan
+              val fileScan = collect(plan) { case s: 
FileSourceScanExecTransformer => s }
+              assert(fileScan.size == 1)
+              val rootPaths = fileScan(0).getRootPathsInternal
+              assert(rootPaths.length == 1)
+              assert(rootPaths(0).startsWith("file:/"))
+              assert(
+                
VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
+                  rootPaths.toArray))
+          }
+        }
+    }
+    val filteredRootPath =
+      VeloxBackendSettings.distinctRootPaths(
+        Seq("file:/test_path/", "test://test/s", "test://test1/s"))
+    assert(filteredRootPath.length == 1)
+    assert(filteredRootPath(0).startsWith("test://"))
+    assert(
+      VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
+        Array("file:/test_path/")))
+    assert(
+      !VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
+        Array("unsupported://test_path")))
+    assert(
+      !VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
+        Array("file:/test_path/", "unsupported://test_path")))
+  }
 }
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index cb49abd7d..5df3a478e 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -33,6 +33,7 @@
 #include "utils/ObjectStore.h"
 #include "utils/VeloxBatchResizer.h"
 #include "velox/common/base/BloomFilter.h"
+#include "velox/common/file/FileSystems.h"
 
 #include <iostream>
 
@@ -260,6 +261,24 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_utils_VeloxBatchResizerJniWrapper
   JNI_METHOD_END(gluten::kInvalidObjectHandle)
 }
 
+JNIEXPORT jboolean JNICALL
+Java_org_apache_gluten_utils_VeloxFileSystemValidationJniWrapper_allSupportedByRegisteredFileSystems(
 // NOLINT
+    JNIEnv* env,
+    jclass,
+    jobjectArray stringArray) {
+  JNI_METHOD_START
+  int size = env->GetArrayLength(stringArray);
+  for (int i = 0; i < size; i++) {
+    jstring string = (jstring)(env->GetObjectArrayElement(stringArray, i));
+    std::string path = jStringToCString(env, string);
+    if (!velox::filesystems::isPathSupportedByRegisteredFileSystems(path)) {
+      return false;
+    }
+  }
+  return true;
+  JNI_METHOD_END(false)
+}
+
 #ifdef __cplusplus
 }
 #endif
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 358043cc5..c9a0301b8 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -31,10 +31,11 @@ import 
org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopF
 import org.apache.spark.sql.types.StructField
 
 trait BackendSettingsApi {
-  def supportFileFormatRead(
+  def validateScan(
       format: ReadFileFormat,
       fields: Array[StructField],
       partTable: Boolean,
+      rootPaths: Seq[String],
       paths: Seq[String]): ValidationResult = ValidationResult.succeeded
   def supportWriteFilesExec(
       format: FileFormat,
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
index b34abd490..1a0ff3f84 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
@@ -32,4 +32,6 @@ trait BaseDataSource {
 
   /** Returns the input file paths, used to validate the partition column path 
*/
   def getInputFilePathsInternal: Seq[String]
+
+  def getRootPathsInternal: Seq[String]
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 04697280d..b7953b3ac 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
 import org.apache.gluten.extension.ValidationResult
@@ -59,6 +60,14 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
     }
   }
 
+  def getRootFilePaths: Seq[String] = {
+    if (GlutenConfig.getConf.scanFileSchemeValidationEnabled) {
+      getRootPathsInternal
+    } else {
+      Seq.empty
+    }
+  }
+
   /** Returns the file format properties. */
   def getProperties: Map[String, String] = Map.empty
 
@@ -92,7 +101,12 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
     }
 
     val validationResult = BackendsApiManager.getSettings
-      .supportFileFormatRead(fileFormat, fields, getPartitionSchema.nonEmpty, 
getInputFilePaths)
+      .validateScan(
+        fileFormat,
+        fields,
+        getPartitionSchema.nonEmpty,
+        getRootFilePaths,
+        getInputFilePaths)
     if (!validationResult.ok()) {
       return validationResult
     }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 4860847de..553c7c4e0 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -22,6 +22,7 @@ import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+import org.apache.gluten.utils.FileIndexUtil
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -131,6 +132,14 @@ abstract class BatchScanExecTransformerBase(
     }
   }
 
+  override def getRootPathsInternal: Seq[String] = {
+    scan match {
+      case fileScan: FileScan =>
+        FileIndexUtil.getRootPath(fileScan.fileIndex)
+      case _ => Seq.empty
+    }
+  }
+
   override def doValidateInternal(): ValidationResult = {
     if (pushedAggregate.nonEmpty) {
       return ValidationResult.failed(s"Unsupported aggregation push down for 
$scan.")
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index 3b8ed1167..af49cfd1b 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -21,6 +21,7 @@ import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+import org.apache.gluten.utils.FileIndexUtil
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, PlanExpression}
@@ -126,6 +127,10 @@ abstract class FileSourceScanExecTransformerBase(
     relation.location.inputFiles.toSeq
   }
 
+  override def getRootPathsInternal: Seq[String] = {
+    FileIndexUtil.getRootPath(relation.location)
+  }
+
   override protected def doValidateInternal(): ValidationResult = {
     if (
       !metadataColumns.isEmpty && 
!BackendsApiManager.getSettings.supportNativeMetadataColumns()
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala 
b/gluten-core/src/main/scala/org/apache/gluten/utils/FileIndexUtil.scala
similarity index 56%
copy from 
gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
copy to gluten-core/src/main/scala/org/apache/gluten/utils/FileIndexUtil.scala
index b34abd490..dab593b24 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/utils/FileIndexUtil.scala
@@ -14,22 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.execution
+package org.apache.gluten.utils
 
-import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.execution.datasources._
 
-trait BaseDataSource {
-
-  /** Returns the actual schema of this data source scan. */
-  def getDataSchema: StructType
-
-  /** Returns the required partition schema, used to generate partition 
column. */
-  def getPartitionSchema: StructType
-
-  /** Returns the partitions generated by this data source scan. */
-  def getPartitions: Seq[InputPartition]
-
-  /** Returns the input file paths, used to validate the partition column path 
*/
-  def getInputFilePathsInternal: Seq[String]
+object FileIndexUtil {
+  def getRootPath(index: FileIndex): Seq[String] = {
+    index.rootPaths
+      .filter(_.isAbsolute)
+      .map(_.toString)
+      .toSeq
+  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 8133f1d42..938bac2b1 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -76,6 +76,9 @@ case class HiveTableScanExecTransformer(
     Seq.empty
   }
 
+  // TODO: get root paths from hive table.
+  override def getRootPathsInternal: Seq[String] = Seq.empty
+
   override def metricsUpdater(): MetricsUpdater =
     
BackendsApiManager.getMetricsApiInstance.genHiveTableScanTransformerMetricsUpdater(metrics)
 
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 5a735b802..9fb8521d9 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -54,6 +54,9 @@ case class IcebergScanTransformer(
 
   override def getInputFilePathsInternal: Seq[String] = Seq.empty
 
+  // TODO: get root paths from table.
+  override def getRootPathsInternal: Seq[String] = Seq.empty
+
   override lazy val fileFormat: ReadFileFormat = 
GlutenIcebergSourceUtil.getFileFormat(scan)
 
   override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): 
Seq[SplitInfo] = {
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 9ddce7191..b1ef4be5c 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -116,6 +116,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
   def forceParquetTimestampTypeScanFallbackEnabled: Boolean =
     conf.getConf(VELOX_FORCE_PARQUET_TIMESTAMP_TYPE_SCAN_FALLBACK)
 
+  def scanFileSchemeValidationEnabled: Boolean =
+    conf.getConf(VELOX_SCAN_FILE_SCHEME_VALIDATION_ENABLED)
+
   // whether to use ColumnarShuffleManager
   def isUseColumnarShuffleManager: Boolean =
     conf
@@ -2009,6 +2012,16 @@ object GlutenConfig {
       .booleanConf
       .createWithDefault(false)
 
+  val VELOX_SCAN_FILE_SCHEME_VALIDATION_ENABLED =
+    buildConf("spark.gluten.sql.scan.fileSchemeValidation.enabled")
+      .internal()
+      .doc(
+        "When true, enable file path scheme validation for scan. Validation 
will fail if" +
+          " file scheme is not supported by registered file systems, which 
will cause scan " +
+          " operator fall back.")
+      .booleanConf
+      .createWithDefault(true)
+
   val COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED =
     buildConf("spark.gluten.sql.columnar.cast.avg")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to