nsivabalan commented on code in PR #18136:
URL: https://github.com/apache/hudi/pull/18136#discussion_r2791236189


##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -165,14 +170,17 @@ public BaseHoodieTableFileIndex(HoodieEngineContext 
engineContext,
     this.partitionColumns = metaClient.getTableConfig().getPartitionFields()
         .orElseGet(() -> new String[0]);
 
+    // Disable metadata when ro_path_filter is enabled.
     this.metadataConfig = HoodieMetadataConfig.newBuilder()
         .fromProperties(configProperties)
         .enable(configProperties.getBoolean(ENABLE.key(), 
DEFAULT_METADATA_ENABLE_FOR_READERS)
-            && HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient))
+            && HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient)

Review Comment:
   hey @yihua : lets chat about this as well tomorrow.



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -267,14 +275,46 @@ private Map<PartitionPath, List<FileSlice>> 
loadFileSlicesForPartitions(List<Par
       validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
     }
 
-    List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+    Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::requestedTime));
+    validate(activeTimeline, queryInstant);
 
-    try (HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
-      Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::requestedTime));
-      validate(activeTimeline, queryInstant);
+    HoodieTimer timer = HoodieTimer.start();
+    List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions, 
activeTimeline);
+    log.info("On {} with query instant as {}, it took {}ms to list all files 
{} Hudi partitions",
+        metaClient.getTableConfig().getTableName(), queryInstant.map(instant 
-> instant).orElse("N/A"),
+        timer.endTimer(), partitions.size());
+
+    if (useROPathFilterForListing && !shouldIncludePendingCommits) {
+      // Group files by partition path, then by file group ID
+      Map<String, PartitionPath> partitionsMap = new HashMap<>();

Review Comment:
   can we move this to a private method. 
   `generatePartitionFileSlicesPostROTablePathFilter`



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -527,6 +528,21 @@ object HoodieFileIndex extends Logging {
       
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
 listingModeOverride)
     }
 
+    var hoodieROTablePathFilterBasedFileListingEnabled = 
getConfigValue(options, sqlConf,

Review Comment:
   once we fix the config key, lets fix these vars as well



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -227,6 +227,18 @@ object DataSourceReadOptions {
         " by carefully analyzing provided partition-column predicates and 
deducing corresponding partition-path prefix from " +
         " them (if possible).")
 
+  val FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER: 
ConfigProperty[Boolean] =
+    
ConfigProperty.key("hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter")

Review Comment:
   `hoodie.datasource.read.file.index.optimize.listing.using.path.filter`



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java:
##########
@@ -99,25 +100,25 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
 
   private transient HoodieLocalEngineContext engineContext;
 
-
   private transient HoodieStorage storage;
 
   public HoodieROTablePathFilter() {
-    this(new Configuration());
+    this(HadoopFSUtils.getStorageConf());
   }
 
-  public HoodieROTablePathFilter(Configuration conf) {
+  @VisibleForTesting
+  public HoodieROTablePathFilter(StorageConfiguration storageConf) {
     this.hoodiePathCache = new ConcurrentHashMap<>();
     this.nonHoodiePathCache = new HashSet<>();
-    this.conf = HadoopFSUtils.getStorageConfWithCopy(conf);
+    this.conf = storageConf;
     this.metaClientCache = new HashMap<>();
     this.completedTimelineCache =  new HashMap<>();
   }
 
   /**
    * By passing metaClient and completedTimeline, we can sync the view seen 
from this class against HoodieFileIndex class
    */
-  public HoodieROTablePathFilter(Configuration conf,
+  public HoodieROTablePathFilter(StorageConfiguration conf,

Review Comment:
   hey @yihua : can you review the changes in this patch



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestROPathFilterOnRead:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+/**
+ * Tests for ROPathFilter optimization with advanced scenarios and edge cases.
+ */
+class TestROPathFilterAdvanced extends HoodieSparkSqlTestBase {
+
+  val RO_PATH_FILTER_OPT_KEY = 
"hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter"
+
+  test("Test ROPathFilter with empty table") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  orderingFields = 'ts'
+           | )
+       """.stripMargin)
+
+      // Query empty table with ROPathFilter enabled
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        val result = spark.sql(s"select * from $tableName").collect()
+        assert(result.length == 0)
+      }
+    }
+  }
+
+  test("Test ROPathFilter with partition pruning") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  dt string
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  orderingFields = 'ts'
+           | )
+           | partitioned by (dt)
+       """.stripMargin)
+
+      // Query empty table with ROPathFilter enabled
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        val result = spark.sql(s"select * from $tableName").collect()
+        assert(result.length == 0)
+      }
+
+      // Insert data across multiple partitions
+      spark.sql(s"""insert into $tableName values(1, "a1", 10.0, 1000, 
"2024-01-01")""")
+      spark.sql(s"""insert into $tableName values(2, "a2", 20.0, 2000, 
"2024-01-02")""")
+
+      // Update data in first partition
+      spark.sql(s"update $tableName set price = 15.0 where id = 1")
+
+      // Query single partition with ROPathFilter
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where dt 
= '2024-01-01'")(
+          Seq(1, "a1", 15.0, 1000, "2024-01-01")
+        )
+      }
+    }
+  }
+
+  test("Test ROPathFilter with concurrent inserts to different partitions") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  region string
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  orderingFields = 'ts'
+           | )
+           | partitioned by (region)
+       """.stripMargin)
+
+      // Insert data to different partitions
+      spark.sql(s"""insert into $tableName values(1, "a1", 10.0, 1000, 
"US")""")
+      spark.sql(s"""insert into $tableName values(2, "a2", 20.0, 2000, 
"EU")""")
+      spark.sql(s"""insert into $tableName values(3, "a3", 30.0, 3000, 
"APAC")""")
+      spark.sql(s"""insert into $tableName values(4, "a4", 40.0, 4000, 
"US")""")
+
+      // Query all data with ROPathFilter
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        checkAnswer(s"select id, name, price, ts, region from $tableName order 
by id")(
+          Seq(1, "a1", 10.0, 1000, "US"),
+          Seq(2, "a2", 20.0, 2000, "EU"),
+          Seq(3, "a3", 30.0, 3000, "APAC"),
+          Seq(4, "a4", 40.0, 4000, "US")
+        )
+      }
+    }
+  }
+
+  test("Test ROPathFilter with multiple deletes and updates") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  orderingFields = 'ts'
+           | )
+       """.stripMargin)
+
+      // Insert initial data
+      for (i <- 1 to 10) {
+        spark.sql(s"""insert into $tableName values($i, "name$i", ${i * 10.0}, 
${i * 1000})""")
+      }
+
+      // Perform mix of updates and deletes
+      spark.sql(s"update $tableName set price = price * 2 where id % 2 = 0")
+      spark.sql(s"delete from $tableName where id % 3 = 0")
+
+      // Query with ROPathFilter
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        val result = spark.sql(s"select id, name, price, ts from $tableName 
order by id").collect()
+        // Should have deleted records where id % 3 = 0 (3, 6, 9)
+        // Should have doubled price for even ids (2, 4, 8, 10)
+        assert(result.length == 7) // 10 - 3 deleted = 7

Review Comment:
   do you think below assertion would work.
   
   we can rename one of the earlier versions of a file slice so that 
HoodieBaseFile parsing will fail.
   so, if RO table path filter works as intended, listing files from a given 
partition should not fail, since we won't even try to parse the file. 
   
   but if RO table path filter did not work, it would fail. 
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to