This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 3dceeb85cd [GLUTEN-7394][CH]Reduce the times of the calling listFiles 
when executing query from the parquet file format (#7417)
3dceeb85cd is described below

commit 3dceeb85cd8cf15a6a8630f74235033a507eea9e
Author: Zhichao Zhang <[email protected]>
AuthorDate: Tue Oct 8 09:46:31 2024 +0800

    [GLUTEN-7394][CH]Reduce the times of the calling listFiles when executing 
query from the parquet file format (#7417)
    
    * [GLUTEN-7394][CH]Reduce the times of the calling listFiles when executing 
query from the mergetree file format
    
    Reduce the times of the calling listFiles when executing query from the 
mergetree file format
    
    Close #7394.
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  | 22 ++-----------
 .../backendsapi/clickhouse/CHIteratorApi.scala     | 37 ++++++++++++++++++++--
 .../gluten/backendsapi/velox/VeloxBackend.scala    |  4 +--
 cpp-ch/local-engine/Common/GlutenStringUtils.cpp   | 34 ++------------------
 cpp-ch/local-engine/Common/GlutenStringUtils.h     |  5 ++-
 .../Storages/SubstraitSource/FormatFile.cpp        | 19 +++++++----
 cpp-ch/local-engine/tests/gtest_utils.cpp          | 31 ------------------
 .../gluten/execution/IcebergScanTransformer.scala  |  2 --
 .../gluten/backendsapi/BackendSettingsApi.scala    |  8 ++---
 .../apache/gluten/execution/BaseDataSource.scala   |  3 --
 .../execution/BasicScanExecTransformer.scala       | 17 +---------
 .../execution/BatchScanExecTransformer.scala       |  7 ----
 .../execution/FileSourceScanExecTransformer.scala  |  4 ---
 .../sql/hive/HiveTableScanExecTransformer.scala    |  5 ---
 14 files changed, 59 insertions(+), 139 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 725e90e2ad..59d912d8e7 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -145,18 +145,7 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
   override def validateScan(
       format: ReadFileFormat,
       fields: Array[StructField],
-      partTable: Boolean,
-      rootPaths: Seq[String],
-      paths: Seq[String]): ValidationResult = {
-
-    def validateFilePath: Boolean = {
-      // Fallback to vanilla spark when the input path
-      // does not contain the partition info.
-      if (partTable && !paths.forall(_.contains("="))) {
-        return false
-      }
-      true
-    }
+      rootPaths: Seq[String]): ValidationResult = {
 
     // Validate if all types are supported.
     def hasComplexType: Boolean = {
@@ -176,12 +165,7 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
       !unsupportedDataTypes.isEmpty
     }
     format match {
-      case ParquetReadFormat =>
-        if (validateFilePath) {
-          ValidationResult.succeeded
-        } else {
-          ValidationResult.failed("Validate file path failed.")
-        }
+      case ParquetReadFormat => ValidationResult.succeeded
       case OrcReadFormat => ValidationResult.succeeded
       case MergeTreeReadFormat => ValidationResult.succeeded
       case TextReadFormat =>
@@ -343,8 +327,6 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
 
   override def transformCheckOverflow: Boolean = false
 
-  override def requiredInputFilePaths(): Boolean = true
-
   override def requireBloomFilterAggMightContainJointFallback(): Boolean = 
false
 
   def maxShuffleReadRows(): Long = {
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 0a3dbc3f5a..3c834b7ca8 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -34,16 +34,20 @@ import org.apache.spark.affinity.CHAffinity
 import org.apache.spark.executor.InputMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.shuffle.CHColumnarShuffleWriter
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources.FilePartition
 import 
org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer,
 ExtensionTableBuilder, ExtensionTableNode}
 import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 import java.lang.{Long => JLong}
 import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.time.ZoneOffset
 import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
 
 import scala.collection.JavaConverters._
@@ -156,14 +160,41 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
         val fileSizes = new JArrayList[JLong]()
         val modificationTimes = new JArrayList[JLong]()
         val partitionColumns = new JArrayList[JMap[String, String]]
+        val metadataColumns = new JArrayList[JMap[String, String]]
         f.files.foreach {
           file =>
             paths.add(new URI(file.filePath.toString()).toASCIIString)
             starts.add(JLong.valueOf(file.start))
             lengths.add(JLong.valueOf(file.length))
-            // TODO: Support custom partition location
+            val metadataColumn =
+              SparkShimLoader.getSparkShims.generateMetadataColumns(file, 
metadataColumnNames)
+            metadataColumns.add(metadataColumn)
             val partitionColumn = new JHashMap[String, String]()
+            for (i <- 0 until file.partitionValues.numFields) {
+              val partitionColumnValue = if (file.partitionValues.isNullAt(i)) 
{
+                ExternalCatalogUtils.DEFAULT_PARTITION_NAME
+              } else {
+                val pn = file.partitionValues.get(i, 
partitionSchema.fields(i).dataType)
+                partitionSchema.fields(i).dataType match {
+                  case _: BinaryType =>
+                    new String(pn.asInstanceOf[Array[Byte]], 
StandardCharsets.UTF_8)
+                  case _: DateType =>
+                    DateFormatter.apply().format(pn.asInstanceOf[Integer])
+                  case _: DecimalType =>
+                    pn.asInstanceOf[Decimal].toJavaBigInteger.toString
+                  case _: TimestampType =>
+                    TimestampFormatter
+                      .getFractionFormatter(ZoneOffset.UTC)
+                      .format(pn.asInstanceOf[java.lang.Long])
+                  case _ => pn.toString
+                }
+              }
+              partitionColumn.put(
+                ConverterUtils.normalizeColName(partitionSchema.names(i)),
+                partitionColumnValue)
+            }
             partitionColumns.add(partitionColumn)
+
             val (fileSize, modificationTime) =
               
SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file)
             (fileSize, modificationTime) match {
@@ -185,7 +216,7 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
           fileSizes,
           modificationTimes,
           partitionColumns,
-          new JArrayList[JMap[String, String]](),
+          metadataColumns,
           fileFormat,
           preferredLocations.toList.asJava,
           mapAsJavaMap(properties)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 56dc92f420..939fc7f04f 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -93,9 +93,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
   override def validateScan(
       format: ReadFileFormat,
       fields: Array[StructField],
-      partTable: Boolean,
-      rootPaths: Seq[String],
-      paths: Seq[String]): ValidationResult = {
+      rootPaths: Seq[String]): ValidationResult = {
     val filteredRootPaths = distinctRootPaths(rootPaths)
     if (
       filteredRootPaths.nonEmpty && !VeloxFileSystemValidationJniWrapper
diff --git a/cpp-ch/local-engine/Common/GlutenStringUtils.cpp 
b/cpp-ch/local-engine/Common/GlutenStringUtils.cpp
index 4a18f4ceda..858099fff9 100644
--- a/cpp-ch/local-engine/Common/GlutenStringUtils.cpp
+++ b/cpp-ch/local-engine/Common/GlutenStringUtils.cpp
@@ -22,48 +22,20 @@
 
 namespace local_engine
 {
-PartitionValues GlutenStringUtils::parsePartitionTablePath(const std::string & 
file)
-{
-    PartitionValues result;
-    Poco::StringTokenizer path(file, "/");
-    for (const auto & item : path)
-    {
-        auto pos = item.find('=');
-        if (pos != std::string::npos)
-        {
-            auto key = boost::to_lower_copy(item.substr(0, pos));
-            auto value = item.substr(pos + 1);
-
-            std::string unescaped_key;
-            std::string unescaped_value;
-            Poco::URI::decode(key, unescaped_key);
-            Poco::URI::decode(value, unescaped_value);
-            result.emplace_back(std::move(unescaped_key), 
std::move(unescaped_value));
-        }
-    }
-    return result;
-}
 
 bool GlutenStringUtils::isNullPartitionValue(const std::string & value)
 {
     return value == "__HIVE_DEFAULT_PARTITION__";
 }
 
-std::string GlutenStringUtils::dumpPartitionValue(const PartitionValue & value)
-{
-    return value.first + "=" + value.second;
-}
-
-std::string GlutenStringUtils::dumpPartitionValues(const PartitionValues & 
values)
+std::string GlutenStringUtils::dumpPartitionValues(const std::map<std::string, 
std::string> & values)
 {
     std::string res;
     res += "[";
 
-    for (size_t i = 0; i < values.size(); ++i)
+    for (const auto & [key, value] : values)
     {
-        if (i)
-            res += ", ";
-        res += dumpPartitionValue(values[i]);
+        res += key + "=" + value + ", ";
     }
 
     res += "]";
diff --git a/cpp-ch/local-engine/Common/GlutenStringUtils.h 
b/cpp-ch/local-engine/Common/GlutenStringUtils.h
index dd04413532..0d980f228f 100644
--- a/cpp-ch/local-engine/Common/GlutenStringUtils.h
+++ b/cpp-ch/local-engine/Common/GlutenStringUtils.h
@@ -17,6 +17,7 @@
 #pragma once
 #include <string>
 #include <vector>
+#include <map>
 
 namespace local_engine
 {
@@ -26,10 +27,8 @@ using PartitionValues = std::vector<PartitionValue>;
 class GlutenStringUtils
 {
 public:
-    static PartitionValues parsePartitionTablePath(const std::string & file);
     static bool isNullPartitionValue(const std::string & value);
 
-    static std::string dumpPartitionValue(const PartitionValue & value);
-    static std::string dumpPartitionValues(const PartitionValues & values);
+    static std::string dumpPartitionValues(const std::map<std::string, 
std::string> & values);
 };
 }
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp
index 1097abe6e6..fc5acc533d 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp
@@ -51,12 +51,19 @@ FormatFile::FormatFile(
     const ReadBufferBuilderPtr & read_buffer_builder_)
     : context(context_), file_info(file_info_), 
read_buffer_builder(read_buffer_builder_)
 {
-    PartitionValues part_vals = 
GlutenStringUtils::parsePartitionTablePath(file_info.uri_file());
-    for (size_t i = 0; i < part_vals.size(); ++i)
+    if (file_info.partition_columns_size())
     {
-        const auto & part = part_vals[i];
-        partition_keys.push_back(part.first);
-        partition_values[part.first] = part.second;
+        for (size_t i = 0; i < file_info.partition_columns_size(); ++i)
+        {
+            const auto & partition_column = file_info.partition_columns(i);
+            std::string unescaped_key;
+            std::string unescaped_value;
+            Poco::URI::decode(partition_column.key(), unescaped_key);
+            Poco::URI::decode(partition_column.value(), unescaped_value);
+            auto key = std::move(unescaped_key);
+            partition_keys.push_back(key);
+            partition_values[key] = std::move(unescaped_value);
+        }
     }
 
     LOG_INFO(
@@ -66,7 +73,7 @@ FormatFile::FormatFile(
         file_info.file_format_case(),
         std::to_string(file_info.start()) + "-" + 
std::to_string(file_info.start() + file_info.length()),
         file_info.partition_index(),
-        GlutenStringUtils::dumpPartitionValues(part_vals));
+        GlutenStringUtils::dumpPartitionValues(partition_values));
 }
 
 FormatFilePtr FormatFileUtil::createFile(
diff --git a/cpp-ch/local-engine/tests/gtest_utils.cpp 
b/cpp-ch/local-engine/tests/gtest_utils.cpp
deleted file mode 100644
index 4ea713921f..0000000000
--- a/cpp-ch/local-engine/tests/gtest_utils.cpp
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <gtest/gtest.h>
-#include <Common/GlutenStringUtils.h>
-
-using namespace local_engine;
-
-TEST(TestStringUtils, TestExtractPartitionValues)
-{
-    std::string path = "/tmp/col1=1/col2=test/a.parquet";
-    auto values = GlutenStringUtils::parsePartitionTablePath(path);
-    ASSERT_EQ(2, values.size());
-    ASSERT_EQ("col1", values[0].first);
-    ASSERT_EQ("1", values[0].second);
-    ASSERT_EQ("col2", values[1].first);
-    ASSERT_EQ("test", values[1].second);
-}
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 60f8a60064..1cbeb52a92 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -53,8 +53,6 @@ case class IcebergScanTransformer(
 
   override def getDataSchema: StructType = new StructType()
 
-  override def getInputFilePathsInternal: Seq[String] = Seq.empty
-
   // TODO: get root paths from table.
   override def getRootPathsInternal: Seq[String] = Seq.empty
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 7d07431a87..f1f46dd87e 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -33,14 +33,14 @@ trait BackendSettingsApi {
   def validateScan(
       format: ReadFileFormat,
       fields: Array[StructField],
-      partTable: Boolean,
-      rootPaths: Seq[String],
-      paths: Seq[String]): ValidationResult = ValidationResult.succeeded
+      rootPaths: Seq[String]): ValidationResult = ValidationResult.succeeded
+
   def supportWriteFilesExec(
       format: FileFormat,
       fields: Array[StructField],
       bucketSpec: Option[BucketSpec],
       options: Map[String, String]): ValidationResult = 
ValidationResult.succeeded
+
   def supportNativeWrite(fields: Array[StructField]): Boolean = true
   def supportNativeMetadataColumns(): Boolean = false
   def supportNativeRowIndexColumn(): Boolean = false
@@ -112,8 +112,6 @@ trait BackendSettingsApi {
 
   def staticPartitionWriteOnly(): Boolean = false
 
-  def requiredInputFilePaths(): Boolean = false
-
   // TODO: Move this to test settings as used in UT only.
   def requireBloomFilterAggMightContainJointFallback(): Boolean = true
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
index 1a0ff3f845..e0621a20de 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BaseDataSource.scala
@@ -30,8 +30,5 @@ trait BaseDataSource {
   /** Returns the partitions generated by this data source scan. */
   def getPartitions: Seq[InputPartition]
 
-  /** Returns the input file paths, used to validate the partition column path 
*/
-  def getInputFilePathsInternal: Seq[String]
-
   def getRootPathsInternal: Seq[String]
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 419c22d6c3..912b93079f 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -50,16 +50,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
   /** This can be used to report FileFormat for a file based scan operator. */
   val fileFormat: ReadFileFormat
 
-  // TODO: Remove this expensive call when CH support scan custom partition 
location.
-  def getInputFilePaths: Seq[String] = {
-    // This is a heavy operation, and only the required backend executes the 
corresponding logic.
-    if (BackendsApiManager.getSettings.requiredInputFilePaths()) {
-      getInputFilePathsInternal
-    } else {
-      Seq.empty
-    }
-  }
-
   def getRootFilePaths: Seq[String] = {
     if (GlutenConfig.getConf.scanFileSchemeValidationEnabled) {
       getRootPathsInternal
@@ -101,12 +91,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
     }
 
     val validationResult = BackendsApiManager.getSettings
-      .validateScan(
-        fileFormat,
-        fields,
-        getPartitionSchema.nonEmpty,
-        getRootFilePaths,
-        getInputFilePaths)
+      .validateScan(fileFormat, fields, getRootFilePaths)
     if (!validationResult.ok()) {
       return validationResult
     }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 553c7c4e0e..e1a1be8e29 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -125,13 +125,6 @@ abstract class BatchScanExecTransformerBase(
     case _ => new StructType()
   }
 
-  override def getInputFilePathsInternal: Seq[String] = {
-    scan match {
-      case fileScan: FileScan => fileScan.fileIndex.inputFiles.toSeq
-      case _ => Seq.empty
-    }
-  }
-
   override def getRootPathsInternal: Seq[String] = {
     scan match {
       case fileScan: FileScan =>
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index af49cfd1ba..d64c5ae016 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -123,10 +123,6 @@ abstract class FileSourceScanExecTransformerBase(
 
   override def getDataSchema: StructType = relation.dataSchema
 
-  override def getInputFilePathsInternal: Seq[String] = {
-    relation.location.inputFiles.toSeq
-  }
-
   override def getRootPathsInternal: Seq[String] = {
     FileIndexUtil.getRootPath(relation.location)
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 938bac2b1b..85432350d4 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -71,11 +71,6 @@ case class HiveTableScanExecTransformer(
 
   override def getDataSchema: StructType = relation.tableMeta.dataSchema
 
-  override def getInputFilePathsInternal: Seq[String] = {
-    // FIXME how does a hive table expose file paths?
-    Seq.empty
-  }
-
   // TODO: get root paths from hive table.
   override def getRootPathsInternal: Seq[String] = Seq.empty
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to