yihua commented on code in PR #18265:
URL: https://github.com/apache/hudi/pull/18265#discussion_r3036406437


##########
hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java:
##########
@@ -64,13 +65,17 @@ public class FileSystemBackedTableMetadata extends 
AbstractHoodieTableMetadata {
 
   private static final int DEFAULT_LISTING_PARALLELISM = 1500;
 
+  @Getter

Review Comment:
   🤖 Is `getDatabaseName()` guaranteed to be non-null here? For tables created 
without a database name in the table config, this would store null. If the 
catalog-backed path later uses this database name to look up partitions, that 
could fail. Have you verified the behavior for tables without an explicit 
database name?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java:
##########
@@ -134,6 +134,13 @@ List<String> 
getPartitionPathWithPathPrefixUsingFilterExpression(List<String> re
                                                                    
Types.RecordType partitionFields,
                                                                    Expression 
expression) throws IOException;
 
+  default List<String> 
getPartitionPathWithPathPrefixUsingFilterExpression(List<String> 
relativePathPrefixes,

Review Comment:
   🤖 Using `List<Object>` for `partitionPredicateExpressions` loses all type 
safety. Callers will need to cast blindly, and any mismatch will only surface 
as a runtime ClassCastException. Could this be a generic type parameter on the 
interface, or would a more specific type (even a simple wrapper) work here to 
avoid the raw Object list?



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -609,16 +608,9 @@ private void resetTableMetadata(HoodieTableMetadata 
newTableMetadata) {
     tableMetadata = newTableMetadata;
   }
 
-  private static HoodieTableMetadata createMetadataTable(
-      HoodieEngineContext engineContext,
-      HoodieStorage storage,
-      TableMetadataFactory metadataFactory,
-      HoodieMetadataConfig metadataConfig,
-      StoragePath basePath
-  ) {
-    HoodieTableMetadata newTableMetadata = metadataFactory.create(
-        engineContext, storage, metadataConfig, basePath.toString(), true);
-    return newTableMetadata;
+  protected HoodieTableMetadata createMetadataTable(HoodieEngineContext 
engineContext) {
+    return metaClient.getTableFormat().getMetadataFactory()
+        .create(engineContext, metaClient.getStorage(), metadataConfig, 
basePath.toString(), true);
   }
 

Review Comment:
   🤖 Making `createMetadataTable` protected (overridable) while it's called 
from `doRefresh()` at line 201 during the base class constructor is risky. If a 
subclass overrides this method and accesses subclass-specific fields (e.g., 
catalog reference, spark session), those fields won't be initialized yet when 
the base constructor runs. Could you consider using lazy initialization or a 
post-construction `init()` method instead?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/metadata/CatalogBackedTableMetadata.scala:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.engine.HoodieEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.internal.schema.Types
+import org.apache.hudi.storage.{HoodieStorage, StoragePath}
+import org.apache.hudi.util.PartitionPathFilterUtil
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.internal.SQLConf
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+class CatalogBackedTableMetadata(engineContext: HoodieEngineContext,
+                                 tableConfig: HoodieTableConfig,
+                                 storage: HoodieStorage,
+                                 datasetBasePath: String) extends
+  FileSystemBackedTableMetadata(engineContext, tableConfig, storage, 
datasetBasePath) with Logging {
+
+  private val sparkSession = 
engineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext.sparkSession
+  private val catalogTableName = tableConfig.getTableName
+  private lazy val catalogDatabaseName =
+    if (StringUtils.isNullOrEmpty(tableConfig.getDatabaseName)) {
+      sparkSession.sessionState.catalog.getCurrentDatabase
+    } else {
+      tableConfig.getDatabaseName
+    }
+  private lazy val tableIdentifier = TableIdentifier(catalogTableName, 
Some(catalogDatabaseName))
+  private lazy val catalogTable = 
sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+

Review Comment:
   🤖 If the table isn't registered in the catalog (e.g., created via DataSource 
API with just a path), `getTableMetadata` will throw `NoSuchTableException`. 
Since `catalogTable` is accessed by `isPartitionedTable` and 
`shouldUseCatalogPartitions` in every partition listing method, the exception 
propagates up without any fallback to `super` (filesystem listing). Could you 
wrap the catalog access in a `Try` and fall back to `super` when the table 
isn't found?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/metadata/CatalogBackedTableMetadata.scala:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.engine.HoodieEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.internal.schema.Types
+import org.apache.hudi.storage.{HoodieStorage, StoragePath}
+import org.apache.hudi.util.PartitionPathFilterUtil
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.internal.SQLConf
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+class CatalogBackedTableMetadata(engineContext: HoodieEngineContext,
+                                 tableConfig: HoodieTableConfig,
+                                 storage: HoodieStorage,
+                                 datasetBasePath: String) extends
+  FileSystemBackedTableMetadata(engineContext, tableConfig, storage, 
datasetBasePath) with Logging {
+
+  private val sparkSession = 
engineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext.sparkSession
+  private val catalogTableName = tableConfig.getTableName
+  private lazy val catalogDatabaseName =
+    if (StringUtils.isNullOrEmpty(tableConfig.getDatabaseName)) {

Review Comment:
   🤖 When `getDatabaseName` returns null/empty, this falls back to 
`getCurrentDatabase`, which reflects the session's current database context — 
not necessarily where the table is registered. If a user runs `USE 
some_other_db` before querying, this would look up the wrong database and 
either throw `NoSuchTableException` or find a different table with the same 
name. Would it be safer to resolve the database from the catalog table's 
metadata (e.g., via the table's location path) or at least document this 
assumption?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -122,6 +123,12 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
 
   protected lazy val shouldFastBootstrap = 
configProperties.getBoolean(DATA_QUERIES_ONLY.key, false)
 
+  lazy val isPartitionListingViaCatalogEnabled: Boolean = {
+    configProperties.getBoolean(FILE_INDEX_PARTITION_LISTING_VIA_CATALOG.key,
+      FILE_INDEX_PARTITION_LISTING_VIA_CATALOG.defaultValue()) &&
+      !metaClient.getTableConfig.isMetadataTableAvailable

Review Comment:
   🤖 I share this concern. The current condition requires the user to 
explicitly disable MDT *and* enable the catalog config. But the motivation is 
handling corrupted MDT — in that scenario, MDT is still technically enabled in 
the table config, it's just broken. So the user would need to first disable 
MDT, then enable catalog listing, which is two steps. It might be worth 
checking whether MDT is *functional* (e.g., metadata table exists and is 
accessible) rather than just whether the config is enabled.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/metadata/CatalogBackedTableMetadata.scala:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.engine.HoodieEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.internal.schema.Types
+import org.apache.hudi.storage.{HoodieStorage, StoragePath}
+import org.apache.hudi.util.PartitionPathFilterUtil
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.internal.SQLConf
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+class CatalogBackedTableMetadata(engineContext: HoodieEngineContext,
+                                 tableConfig: HoodieTableConfig,
+                                 storage: HoodieStorage,
+                                 datasetBasePath: String) extends
+  FileSystemBackedTableMetadata(engineContext, tableConfig, storage, 
datasetBasePath) with Logging {
+
+  private val sparkSession = 
engineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext.sparkSession
+  private val catalogTableName = tableConfig.getTableName
+  private lazy val catalogDatabaseName =
+    if (StringUtils.isNullOrEmpty(tableConfig.getDatabaseName)) {
+      sparkSession.sessionState.catalog.getCurrentDatabase
+    } else {
+      tableConfig.getDatabaseName
+    }
+  private lazy val tableIdentifier = TableIdentifier(catalogTableName, 
Some(catalogDatabaseName))
+  private lazy val catalogTable = 
sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+
+  private def isPartitionedTable: Boolean = {
+    catalogTable.partitionColumnNames.nonEmpty
+  }
+
+  private def shouldUseCatalogPartitions: Boolean = {
+    isPartitionedTable && catalogTable.tracksPartitionsInCatalog
+  }
+
+  override def getAllPartitionPaths():
+  util.List[String] =
+    if (!isPartitionedTable) {
+      util.Collections.emptyList()
+    } else if (shouldUseCatalogPartitions) {
+      sparkSession.sessionState.catalog.externalCatalog
+        .listPartitions(catalogDatabaseName, catalogTableName)
+        .map(catalogTablePartition => {
+          val partitionPathURI = new 
StoragePath(catalogTablePartition.location)
+          FSUtils.getRelativePartitionPath(dataBasePath, partitionPathURI)
+        }).asJava
+    } else {
+      super.getAllPartitionPaths()
+    }
+
+  override def getPartitionPathWithPathPrefixes(relativePathPrefixes: 
util.List[String]):
+  util.List[String] =
+    if (!isPartitionedTable) {
+      util.Collections.emptyList()
+    } else if (shouldUseCatalogPartitions) {
+      filterPartitionsBasedOnRelativePathPrefixes(relativePathPrefixes,
+        sparkSession.sessionState.catalog.externalCatalog
+          .listPartitions(catalogDatabaseName, catalogTableName))
+    } else {
+      super.getPartitionPathWithPathPrefixes(relativePathPrefixes)
+    }
+
+  override def 
getPartitionPathWithPathPrefixUsingFilterExpression(relativePathPrefix: 
util.List[String],
+                                                                   
partitionFields: Types.RecordType,
+                                                                   pushedExpr: 
org.apache.hudi.expression.Expression,
+                                                                   
partitionPredicateExpressions: util.List[Object]):
+  util.List[String] = {
+    if (!isPartitionedTable) {
+      util.Collections.emptyList()
+    } else if (shouldUseCatalogPartitions) {
+      val partitionPredicateExpressionSeq = 
partitionPredicateExpressions.asScala.map(_.asInstanceOf[Expression]).toSeq

Review Comment:
   🤖 The `else` branch falls back to the 3-param `super` call (without 
`partitionPredicateExpressions`), which means the Spark partition predicate 
expressions are silently dropped when the catalog isn't available. Is this 
intentional? In that case the filesystem listing only uses the Hudi 
`pushedExpr` for filtering, which should still be correct, but I wanted to 
confirm no filter information is lost here.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/metadata/CatalogBackedTableMetadata.scala:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.engine.HoodieEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.internal.schema.Types
+import org.apache.hudi.storage.{HoodieStorage, StoragePath}
+import org.apache.hudi.util.PartitionPathFilterUtil
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.internal.SQLConf
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+class CatalogBackedTableMetadata(engineContext: HoodieEngineContext,
+                                 tableConfig: HoodieTableConfig,
+                                 storage: HoodieStorage,
+                                 datasetBasePath: String) extends
+  FileSystemBackedTableMetadata(engineContext, tableConfig, storage, 
datasetBasePath) with Logging {
+
+  lazy val sparkSession = 
engineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext.sparkSession
+
+  override def getAllPartitionPaths():
+  util.List[String] = {
+    val catalogTablePartitionSeq =
+      sparkSession.sessionState.catalog.externalCatalog
+        .listPartitions(getDatabaseName, getTableName)
+    catalogTablePartitionSeq
+      .map(catalogTablePartition => {
+        val partitionPathURI = new StoragePath(catalogTablePartition.location)
+        FSUtils.getRelativePartitionPath(dataBasePath, partitionPathURI)
+      }).asJava
+  }
+
+  override def getPartitionPathWithPathPrefixes(relativePathPrefixes: 
util.List[String]):
+  util.List[String] = {
+    val catalogTablePartitionSeq =
+      sparkSession.sessionState.catalog.externalCatalog
+        .listPartitions(getDatabaseName, getTableName)
+    filterPartitionsBasedOnRelativePathPrefixs(relativePathPrefixes, 
catalogTablePartitionSeq)
+  }
+
+  override def 
getPartitionPathWithPathPrefixUsingFilterExpression(relativePathPrefix: 
util.List[String],
+                                                                   
partitionFields: Types.RecordType,
+                                                                   pushedExpr: 
org.apache.hudi.expression.Expression,

Review Comment:
   🤖 The author's point about it being part of the parent class signature is 
fair, but I think the concern still stands. In `CatalogBackedTableMetadata`, 
`partitionPredicateExpressions` *is* used (line 95: 
`partitionPredicateExpressions.asScala.map(_.asInstanceOf[Expression])`). The 
issue is that `pushedExpr` (the Hudi internal `Expression`) is ignored while 
the Spark `Expression` list is used instead. If `partitionPredicateExpressions` 
is null (which is possible since the interface's default method delegates to 
the 3-arg version without it), this would NPE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to