This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ba3318e0d [VL] Fall back scan if file scheme is not supported by
registered file systems (#6672)
ba3318e0d is described below
commit ba3318e0d2fd29e589bb7a0a705b3061065755d9
Author: Zhen Li <[email protected]>
AuthorDate: Mon Aug 12 11:11:08 2024 +0800
[VL] Fall back scan if file scheme is not supported by registered file
systems (#6672)
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 3 +-
.../utils/VeloxFileSystemValidationJniWrapper.java | 19 ++--------
.../gluten/backendsapi/velox/VeloxBackend.scala | 25 +++++++++++-
.../apache/gluten/execution/VeloxScanSuite.scala | 44 ++++++++++++++++++++++
cpp/velox/jni/VeloxJniWrapper.cc | 19 ++++++++++
.../gluten/backendsapi/BackendSettingsApi.scala | 3 +-
.../apache/gluten/execution/BaseDataSource.scala | 2 +
.../execution/BasicScanExecTransformer.scala | 16 +++++++-
.../execution/BatchScanExecTransformer.scala | 9 +++++
.../execution/FileSourceScanExecTransformer.scala | 5 +++
.../FileIndexUtil.scala} | 25 +++++-------
.../sql/hive/HiveTableScanExecTransformer.scala | 3 ++
.../gluten/execution/IcebergScanTransformer.scala | 3 ++
.../scala/org/apache/gluten/GlutenConfig.scala | 13 +++++++
14 files changed, 153 insertions(+), 36 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 265750c2c..a115ca712 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -140,10 +140,11 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
.toLowerCase(Locale.getDefault)
}
- override def supportFileFormatRead(
+ override def validateScan(
format: ReadFileFormat,
fields: Array[StructField],
partTable: Boolean,
+ rootPaths: Seq[String],
paths: Seq[String]): ValidationResult = {
def validateFilePath: Boolean = {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxFileSystemValidationJniWrapper.java
similarity index 56%
copy from
gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
copy to
backends-velox/src/main/java/org/apache/gluten/utils/VeloxFileSystemValidationJniWrapper.java
index b34abd490..8e9bd3f03 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
+++
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxFileSystemValidationJniWrapper.java
@@ -14,22 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.utils;
-import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.types.StructType
+public class VeloxFileSystemValidationJniWrapper {
-trait BaseDataSource {
-
- /** Returns the actual schema of this data source scan. */
- def getDataSchema: StructType
-
- /** Returns the required partition schema, used to generate partition
column. */
- def getPartitionSchema: StructType
-
- /** Returns the partitions generated by this data source scan. */
- def getPartitions: Seq[InputPartition]
-
- /** Returns the input file paths, used to validate the partition column path
*/
- def getInputFilePathsInternal: Seq[String]
+ public static native boolean allSupportedByRegisteredFileSystems(String[]
paths);
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 33efdbc5e..d32911f4a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -25,6 +25,7 @@ import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat,
OrcReadFormat, ParquetReadFormat}
+import org.apache.gluten.utils._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank,
Descending, EulerNumber, Expression, Lag, Lead, Literal, MakeYMInterval,
NamedExpression, NthValue, NTile, PercentRank, Pi, Rand, RangeFrame, Rank,
RowNumber, SortOrder, SparkPartitionID, SparkVersion, SpecialFrameBoundary,
SpecifiedWindowFrame, Uuid}
@@ -40,6 +41,8 @@ import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.hadoop.fs.Path
+
import scala.util.control.Breaks.breakable
class VeloxBackend extends Backend {
@@ -70,11 +73,20 @@ object VeloxBackendSettings extends BackendSettingsApi {
val MAXIMUM_BATCH_SIZE: Int = 32768
- override def supportFileFormatRead(
+ override def validateScan(
format: ReadFileFormat,
fields: Array[StructField],
partTable: Boolean,
+ rootPaths: Seq[String],
paths: Seq[String]): ValidationResult = {
+ val filteredRootPaths = distinctRootPaths(rootPaths)
+ if (
+ !filteredRootPaths.isEmpty && !VeloxFileSystemValidationJniWrapper
+ .allSupportedByRegisteredFileSystems(filteredRootPaths.toArray)
+ ) {
+ return ValidationResult.failed(
+ s"Scheme of [$filteredRootPaths] is not supported by registered file
systems.")
+ }
// Validate if all types are supported.
def validateTypes(validatorFunc: PartialFunction[StructField, String]):
ValidationResult = {
// Collect unsupported types.
@@ -179,6 +191,17 @@ object VeloxBackendSettings extends BackendSettingsApi {
.isDefined
}
+ def distinctRootPaths(paths: Seq[String]): Seq[String] = {
+ // Skip native validation for local path, as local file system is always
registered.
+ // For evey file scheme, only one path is kept.
+ paths
+ .map(p => (new Path(p).toUri.getScheme, p))
+ .groupBy(_._1)
+ .filter(_._1 != "file")
+ .map(_._2.head._2)
+ .toSeq
+ }
+
override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala
index 688cca699..852a1f4fb 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala
@@ -16,6 +16,10 @@
*/
package org.apache.gluten.execution
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.velox.VeloxBackendSettings
+import org.apache.gluten.utils.VeloxFileSystemValidationJniWrapper
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.GreaterThan
import org.apache.spark.sql.execution.ScalarSubquery
@@ -74,4 +78,44 @@ class VeloxScanSuite extends VeloxWholeStageTransformerSuite
{
}
}
}
+
+ test("Test file scheme validation") {
+ withTempPath {
+ path =>
+ withSQLConf(GlutenConfig.NATIVE_WRITER_ENABLED.key -> "false") {
+ spark
+ .range(100)
+ .selectExpr("cast(id % 9 as int) as c1")
+ .write
+ .format("parquet")
+ .save(path.getCanonicalPath)
+ runQueryAndCompare(s"SELECT count(*) FROM
`parquet`.`${path.getCanonicalPath}`") {
+ df =>
+ val plan = df.queryExecution.executedPlan
+ val fileScan = collect(plan) { case s:
FileSourceScanExecTransformer => s }
+ assert(fileScan.size == 1)
+ val rootPaths = fileScan(0).getRootPathsInternal
+ assert(rootPaths.length == 1)
+ assert(rootPaths(0).startsWith("file:/"))
+ assert(
+
VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
+ rootPaths.toArray))
+ }
+ }
+ }
+ val filteredRootPath =
+ VeloxBackendSettings.distinctRootPaths(
+ Seq("file:/test_path/", "test://test/s", "test://test1/s"))
+ assert(filteredRootPath.length == 1)
+ assert(filteredRootPath(0).startsWith("test://"))
+ assert(
+ VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
+ Array("file:/test_path/")))
+ assert(
+ !VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
+ Array("unsupported://test_path")))
+ assert(
+ !VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
+ Array("file:/test_path/", "unsupported://test_path")))
+ }
}
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index cb49abd7d..5df3a478e 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -33,6 +33,7 @@
#include "utils/ObjectStore.h"
#include "utils/VeloxBatchResizer.h"
#include "velox/common/base/BloomFilter.h"
+#include "velox/common/file/FileSystems.h"
#include <iostream>
@@ -260,6 +261,24 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_utils_VeloxBatchResizerJniWrapper
JNI_METHOD_END(gluten::kInvalidObjectHandle)
}
+JNIEXPORT jboolean JNICALL
+Java_org_apache_gluten_utils_VeloxFileSystemValidationJniWrapper_allSupportedByRegisteredFileSystems(
// NOLINT
+ JNIEnv* env,
+ jclass,
+ jobjectArray stringArray) {
+ JNI_METHOD_START
+ int size = env->GetArrayLength(stringArray);
+ for (int i = 0; i < size; i++) {
+ jstring string = (jstring)(env->GetObjectArrayElement(stringArray, i));
+ std::string path = jStringToCString(env, string);
+ if (!velox::filesystems::isPathSupportedByRegisteredFileSystems(path)) {
+ return false;
+ }
+ }
+ return true;
+ JNI_METHOD_END(false)
+}
+
#ifdef __cplusplus
}
#endif
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 358043cc5..c9a0301b8 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -31,10 +31,11 @@ import
org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopF
import org.apache.spark.sql.types.StructField
trait BackendSettingsApi {
- def supportFileFormatRead(
+ def validateScan(
format: ReadFileFormat,
fields: Array[StructField],
partTable: Boolean,
+ rootPaths: Seq[String],
paths: Seq[String]): ValidationResult = ValidationResult.succeeded
def supportWriteFilesExec(
format: FileFormat,
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
index b34abd490..1a0ff3f84 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
@@ -32,4 +32,6 @@ trait BaseDataSource {
/** Returns the input file paths, used to validate the partition column path
*/
def getInputFilePathsInternal: Seq[String]
+
+ def getRootPathsInternal: Seq[String]
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 04697280d..b7953b3ac 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution
+import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
import org.apache.gluten.extension.ValidationResult
@@ -59,6 +60,14 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
}
}
+ def getRootFilePaths: Seq[String] = {
+ if (GlutenConfig.getConf.scanFileSchemeValidationEnabled) {
+ getRootPathsInternal
+ } else {
+ Seq.empty
+ }
+ }
+
/** Returns the file format properties. */
def getProperties: Map[String, String] = Map.empty
@@ -92,7 +101,12 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
}
val validationResult = BackendsApiManager.getSettings
- .supportFileFormatRead(fileFormat, fields, getPartitionSchema.nonEmpty,
getInputFilePaths)
+ .validateScan(
+ fileFormat,
+ fields,
+ getPartitionSchema.nonEmpty,
+ getRootFilePaths,
+ getInputFilePaths)
if (!validationResult.ok()) {
return validationResult
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 4860847de..553c7c4e0 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -22,6 +22,7 @@ import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+import org.apache.gluten.utils.FileIndexUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -131,6 +132,14 @@ abstract class BatchScanExecTransformerBase(
}
}
+ override def getRootPathsInternal: Seq[String] = {
+ scan match {
+ case fileScan: FileScan =>
+ FileIndexUtil.getRootPath(fileScan.fileIndex)
+ case _ => Seq.empty
+ }
+ }
+
override def doValidateInternal(): ValidationResult = {
if (pushedAggregate.nonEmpty) {
return ValidationResult.failed(s"Unsupported aggregation push down for
$scan.")
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index 3b8ed1167..af49cfd1b 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -21,6 +21,7 @@ import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+import org.apache.gluten.utils.FileIndexUtil
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression, PlanExpression}
@@ -126,6 +127,10 @@ abstract class FileSourceScanExecTransformerBase(
relation.location.inputFiles.toSeq
}
+ override def getRootPathsInternal: Seq[String] = {
+ FileIndexUtil.getRootPath(relation.location)
+ }
+
override protected def doValidateInternal(): ValidationResult = {
if (
!metadataColumns.isEmpty &&
!BackendsApiManager.getSettings.supportNativeMetadataColumns()
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
b/gluten-core/src/main/scala/org/apache/gluten/utils/FileIndexUtil.scala
similarity index 56%
copy from
gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
copy to gluten-core/src/main/scala/org/apache/gluten/utils/FileIndexUtil.scala
index b34abd490..dab593b24 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/utils/FileIndexUtil.scala
@@ -14,22 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.utils
-import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.execution.datasources._
-trait BaseDataSource {
-
- /** Returns the actual schema of this data source scan. */
- def getDataSchema: StructType
-
- /** Returns the required partition schema, used to generate partition
column. */
- def getPartitionSchema: StructType
-
- /** Returns the partitions generated by this data source scan. */
- def getPartitions: Seq[InputPartition]
-
- /** Returns the input file paths, used to validate the partition column path
*/
- def getInputFilePathsInternal: Seq[String]
+object FileIndexUtil {
+ def getRootPath(index: FileIndex): Seq[String] = {
+ index.rootPaths
+ .filter(_.isAbsolute)
+ .map(_.toString)
+ .toSeq
+ }
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 8133f1d42..938bac2b1 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -76,6 +76,9 @@ case class HiveTableScanExecTransformer(
Seq.empty
}
+ // TODO: get root paths from hive table.
+ override def getRootPathsInternal: Seq[String] = Seq.empty
+
override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genHiveTableScanTransformerMetricsUpdater(metrics)
diff --git
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 5a735b802..9fb8521d9 100644
---
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -54,6 +54,9 @@ case class IcebergScanTransformer(
override def getInputFilePathsInternal: Seq[String] = Seq.empty
+ // TODO: get root paths from table.
+ override def getRootPathsInternal: Seq[String] = Seq.empty
+
override lazy val fileFormat: ReadFileFormat =
GlutenIcebergSourceUtil.getFileFormat(scan)
override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]):
Seq[SplitInfo] = {
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 9ddce7191..b1ef4be5c 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -116,6 +116,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def forceParquetTimestampTypeScanFallbackEnabled: Boolean =
conf.getConf(VELOX_FORCE_PARQUET_TIMESTAMP_TYPE_SCAN_FALLBACK)
+ def scanFileSchemeValidationEnabled: Boolean =
+ conf.getConf(VELOX_SCAN_FILE_SCHEME_VALIDATION_ENABLED)
+
// whether to use ColumnarShuffleManager
def isUseColumnarShuffleManager: Boolean =
conf
@@ -2009,6 +2012,16 @@ object GlutenConfig {
.booleanConf
.createWithDefault(false)
+ val VELOX_SCAN_FILE_SCHEME_VALIDATION_ENABLED =
+ buildConf("spark.gluten.sql.scan.fileSchemeValidation.enabled")
+ .internal()
+ .doc(
+ "When true, enable file path scheme validation for scan. Validation
will fail if" +
+ " file scheme is not supported by registered file systems, which
will cause scan " +
+ " operator fall back.")
+ .booleanConf
+ .createWithDefault(true)
+
val COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED =
buildConf("spark.gluten.sql.columnar.cast.avg")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]