yihua commented on code in PR #18227:
URL: https://github.com/apache/hudi/pull/18227#discussion_r2842311774


##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCBasedMetadataOperator.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.ddl;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.sync.common.model.FieldSchema;
+import org.apache.hudi.sync.common.model.Partition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides metadata query operations (tableExists, databaseExists,
+ * getTableSchema, etc.) backed by a JDBC connection to HiveServer2.
+ *
+ * <p>This is used as a fallback when the Thrift-based
+ * {@link org.apache.hadoop.hive.metastore.IMetaStoreClient} is
+ * incompatible with the target HMS version (e.g., HMS 4.x changed the
+ * Thrift API from {@code get_table} to {@code get_table_req}).
+ *
+ * <p>All SQL queries use standard HiveQL that is stable across Hive
+ * versions. The operator does not manage the lifecycle of the JDBC
+ * connection — the caller ({@link JDBCExecutor}) owns it.
+ */
+public class JDBCBasedMetadataOperator {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JDBCBasedMetadataOperator.class);
+
+  private final Connection connection;
+  private final String databaseName;
+
+  public JDBCBasedMetadataOperator(Connection connection, String databaseName) 
{
+    this.connection = connection;
+    this.databaseName = databaseName;
+  }
+
+  /**
+   * Checks if a table exists via {@code SHOW TABLES ... LIKE}.
+   */
+  public boolean tableExists(String tableName) {
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW TABLES IN `" + databaseName + "` LIKE '" + tableName + 
"'")) {
+      return rs.next();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to check if table exists via JDBC: " + tableName, e);
+    }
+  }
+
+  /**
+   * Checks if a database exists via {@code SHOW DATABASES LIKE}.
+   */
+  public boolean databaseExists(String dbName) {
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW DATABASES LIKE '" + dbName + "'")) {
+      return rs.next();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to check if database exists via JDBC: " + dbName, e);
+    }
+  }
+
+  /**
+   * Retrieves field schemas via {@code DESCRIBE}.
+   */
+  public List<FieldSchema> getFieldSchemas(String tableName) {
+    List<FieldSchema> fields = new ArrayList<>();
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery("DESCRIBE " + fqTable)) {
+      while (rs.next()) {
+        String colName = rs.getString(1);
+        String colType = rs.getString(2);
+        String comment = rs.getString(3);
+        if (colName != null && !colName.trim().isEmpty()
+            && !colName.startsWith("#")) {
+          fields.add(new FieldSchema(
+              colName.trim(),
+              colType != null ? colType.trim() : "",
+              comment));
+        }
+      }
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to get field schemas via JDBC for: " + tableName, e);
+    }
+    return fields;
+  }
+
+  /**
+   * Retrieves a single table property via {@code SHOW TBLPROPERTIES}.
+   */
+  public Option<String> getTableProperty(String tableName, String key) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW TBLPROPERTIES " + fqTable + " ('" + key + "')")) {
+      if (rs.next()) {
+        String value = rs.getString(2);
+        if (value != null && !value.contains("does not exist")) {

Review Comment:
   The `!value.contains("does not exist")` check seems fragile. Would it be 
safer to check the result set column count or use a more specific pattern match 
on the Hive error format?



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCBasedMetadataOperator.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.ddl;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.sync.common.model.FieldSchema;
+import org.apache.hudi.sync.common.model.Partition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides metadata query operations (tableExists, databaseExists,
+ * getTableSchema, etc.) backed by a JDBC connection to HiveServer2.
+ *
+ * <p>This is used as a fallback when the Thrift-based
+ * {@link org.apache.hadoop.hive.metastore.IMetaStoreClient} is
+ * incompatible with the target HMS version (e.g., HMS 4.x changed the
+ * Thrift API from {@code get_table} to {@code get_table_req}).
+ *
+ * <p>All SQL queries use standard HiveQL that is stable across Hive
+ * versions. The operator does not manage the lifecycle of the JDBC
+ * connection — the caller ({@link JDBCExecutor}) owns it.
+ */
+public class JDBCBasedMetadataOperator {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JDBCBasedMetadataOperator.class);
+
+  private final Connection connection;
+  private final String databaseName;
+
+  public JDBCBasedMetadataOperator(Connection connection, String databaseName) 
{
+    this.connection = connection;
+    this.databaseName = databaseName;
+  }
+
+  /**
+   * Checks if a table exists via {@code SHOW TABLES ... LIKE}.
+   */
+  public boolean tableExists(String tableName) {
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW TABLES IN `" + databaseName + "` LIKE '" + tableName + 
"'")) {
+      return rs.next();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to check if table exists via JDBC: " + tableName, e);
+    }
+  }
+
+  /**
+   * Checks if a database exists via {@code SHOW DATABASES LIKE}.
+   */
+  public boolean databaseExists(String dbName) {
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW DATABASES LIKE '" + dbName + "'")) {
+      return rs.next();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to check if database exists via JDBC: " + dbName, e);
+    }
+  }
+
+  /**
+   * Retrieves field schemas via {@code DESCRIBE}.
+   */
+  public List<FieldSchema> getFieldSchemas(String tableName) {
+    List<FieldSchema> fields = new ArrayList<>();
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery("DESCRIBE " + fqTable)) {
+      while (rs.next()) {
+        String colName = rs.getString(1);
+        String colType = rs.getString(2);
+        String comment = rs.getString(3);
+        if (colName != null && !colName.trim().isEmpty()
+            && !colName.startsWith("#")) {
+          fields.add(new FieldSchema(
+              colName.trim(),
+              colType != null ? colType.trim() : "",
+              comment));
+        }
+      }
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to get field schemas via JDBC for: " + tableName, e);
+    }
+    return fields;
+  }
+
+  /**
+   * Retrieves a single table property via {@code SHOW TBLPROPERTIES}.
+   */
+  public Option<String> getTableProperty(String tableName, String key) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW TBLPROPERTIES " + fqTable + " ('" + key + "')")) {
+      if (rs.next()) {
+        String value = rs.getString(2);
+        if (value != null && !value.contains("does not exist")) {
+          return Option.of(value);
+        }
+      }
+      return Option.empty();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to get table property via JDBC: " + key, e);
+    }
+  }
+
+  /**
+   * Sets table properties via {@code ALTER TABLE ... SET TBLPROPERTIES}.
+   */
+  public void setTableProperties(String tableName, Map<String, String> 
properties) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    StringBuilder sb = new StringBuilder("ALTER TABLE ")
+        .append(fqTable).append(" SET TBLPROPERTIES (");
+    boolean first = true;
+    for (Map.Entry<String, String> entry : properties.entrySet()) {
+      if (!first) {
+        sb.append(", ");
+      }
+      sb.append("'").append(entry.getKey()).append("'='")
+          .append(entry.getValue()).append("'");

Review Comment:
   I guess this should not happen, as a minor risk: if `entry.getKey()` or 
entry.getValue()` contain a single quote (`'`), the statement breaks.



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCBasedMetadataOperator.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.ddl;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.sync.common.model.FieldSchema;
+import org.apache.hudi.sync.common.model.Partition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides metadata query operations (tableExists, databaseExists,
+ * getTableSchema, etc.) backed by a JDBC connection to HiveServer2.
+ *
+ * <p>This is used as a fallback when the Thrift-based
+ * {@link org.apache.hadoop.hive.metastore.IMetaStoreClient} is
+ * incompatible with the target HMS version (e.g., HMS 4.x changed the
+ * Thrift API from {@code get_table} to {@code get_table_req}).
+ *
+ * <p>All SQL queries use standard HiveQL that is stable across Hive
+ * versions. The operator does not manage the lifecycle of the JDBC
+ * connection — the caller ({@link JDBCExecutor}) owns it.
+ */
+public class JDBCBasedMetadataOperator {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JDBCBasedMetadataOperator.class);
+
+  private final Connection connection;
+  private final String databaseName;
+
+  public JDBCBasedMetadataOperator(Connection connection, String databaseName) 
{
+    this.connection = connection;
+    this.databaseName = databaseName;
+  }
+
+  /**
+   * Checks if a table exists via {@code SHOW TABLES ... LIKE}.
+   */
+  public boolean tableExists(String tableName) {
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW TABLES IN `" + databaseName + "` LIKE '" + tableName + 
"'")) {
+      return rs.next();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to check if table exists via JDBC: " + tableName, e);
+    }
+  }
+
+  /**
+   * Checks if a database exists via {@code SHOW DATABASES LIKE}.
+   */
+  public boolean databaseExists(String dbName) {
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW DATABASES LIKE '" + dbName + "'")) {
+      return rs.next();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to check if database exists via JDBC: " + dbName, e);
+    }
+  }
+
+  /**
+   * Retrieves field schemas via {@code DESCRIBE}.
+   */
+  public List<FieldSchema> getFieldSchemas(String tableName) {
+    List<FieldSchema> fields = new ArrayList<>();
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery("DESCRIBE " + fqTable)) {

Review Comment:
   nit: would be good to have a `executeQuery()` method with statement, lambda 
function from ResultSet `rs` to return the values, and the error message to 
improve the readability for all such methods.



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java:
##########
@@ -277,9 +381,13 @@ public void createOrReplaceTable(String tableName,
       // and rename temp table to actual table
       dropTable(tableName);
 
-      Table table = client.getTable(databaseName, tempTableName);
-      table.setTableName(tableName);
-      client.alter_table(databaseName, tempTableName, table);
+      if (useJdbcFallback()) {
+        jdbcMetadataOperator.renameTable(tempTableName, tableName);
+      } else {
+        Table table = client.getTable(databaseName, tempTableName);
+        table.setTableName(tableName);
+        client.alter_table(databaseName, tempTableName, table);
+      }
     } catch (Exception ex) {

Review Comment:
   Unlike every other catch block in this class, this one doesn't call 
`detectThriftIncompatibility`. If `createOrReplaceTable` happens to be the 
first method that triggers a `TApplicationException` (e.g., `get_table` is 
renamed in HMS 4.x but `tableExists` uses a different Thrift call that still 
works), the fallback won't kick in. Is there a reason of not doing it for this 
one?



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCBasedMetadataOperator.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.ddl;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.sync.common.model.FieldSchema;
+import org.apache.hudi.sync.common.model.Partition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides metadata query operations (tableExists, databaseExists,
+ * getTableSchema, etc.) backed by a JDBC connection to HiveServer2.
+ *
+ * <p>This is used as a fallback when the Thrift-based
+ * {@link org.apache.hadoop.hive.metastore.IMetaStoreClient} is
+ * incompatible with the target HMS version (e.g., HMS 4.x changed the
+ * Thrift API from {@code get_table} to {@code get_table_req}).
+ *
+ * <p>All SQL queries use standard HiveQL that is stable across Hive
+ * versions. The operator does not manage the lifecycle of the JDBC
+ * connection — the caller ({@link JDBCExecutor}) owns it.
+ */
+public class JDBCBasedMetadataOperator {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JDBCBasedMetadataOperator.class);
+
+  private final Connection connection;
+  private final String databaseName;
+
+  public JDBCBasedMetadataOperator(Connection connection, String databaseName) 
{
+    this.connection = connection;
+    this.databaseName = databaseName;
+  }
+
+  /**
+   * Checks if a table exists via {@code SHOW TABLES ... LIKE}.
+   */
+  public boolean tableExists(String tableName) {
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW TABLES IN `" + databaseName + "` LIKE '" + tableName + 
"'")) {
+      return rs.next();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to check if table exists via JDBC: " + tableName, e);
+    }
+  }
+
+  /**
+   * Checks if a database exists via {@code SHOW DATABASES LIKE}.
+   */
+  public boolean databaseExists(String dbName) {
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW DATABASES LIKE '" + dbName + "'")) {
+      return rs.next();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to check if database exists via JDBC: " + dbName, e);
+    }
+  }
+
+  /**
+   * Retrieves field schemas via {@code DESCRIBE}.
+   */
+  public List<FieldSchema> getFieldSchemas(String tableName) {
+    List<FieldSchema> fields = new ArrayList<>();
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery("DESCRIBE " + fqTable)) {
+      while (rs.next()) {
+        String colName = rs.getString(1);
+        String colType = rs.getString(2);
+        String comment = rs.getString(3);
+        if (colName != null && !colName.trim().isEmpty()
+            && !colName.startsWith("#")) {
+          fields.add(new FieldSchema(
+              colName.trim(),
+              colType != null ? colType.trim() : "",
+              comment));
+        }
+      }
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to get field schemas via JDBC for: " + tableName, e);
+    }
+    return fields;
+  }
+
+  /**
+   * Retrieves a single table property via {@code SHOW TBLPROPERTIES}.
+   */
+  public Option<String> getTableProperty(String tableName, String key) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW TBLPROPERTIES " + fqTable + " ('" + key + "')")) {
+      if (rs.next()) {
+        String value = rs.getString(2);
+        if (value != null && !value.contains("does not exist")) {
+          return Option.of(value);
+        }
+      }
+      return Option.empty();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to get table property via JDBC: " + key, e);
+    }
+  }
+
+  /**
+   * Sets table properties via {@code ALTER TABLE ... SET TBLPROPERTIES}.
+   */
+  public void setTableProperties(String tableName, Map<String, String> 
properties) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    StringBuilder sb = new StringBuilder("ALTER TABLE ")
+        .append(fqTable).append(" SET TBLPROPERTIES (");
+    boolean first = true;
+    for (Map.Entry<String, String> entry : properties.entrySet()) {
+      if (!first) {
+        sb.append(", ");
+      }
+      sb.append("'").append(entry.getKey()).append("'='")
+          .append(entry.getValue()).append("'");
+      first = false;
+    }
+    sb.append(")");
+    executeSQL(sb.toString());
+  }
+
+  /**
+   * Retrieves the table location from {@code DESCRIBE FORMATTED}.
+   */
+  public String getTableLocation(String tableName) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery("DESCRIBE FORMATTED " + fqTable)) {
+      while (rs.next()) {
+        String col = rs.getString(1);
+        if (col != null && col.trim().equals("Location:")) {
+          return rs.getString(2).trim();
+        }
+      }
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to get table location via JDBC for: " + tableName, e);
+    }
+    throw new HoodieHiveSyncException(
+        "Location not found in DESCRIBE FORMATTED for: " + tableName);
+  }
+
+  /**
+   * Lists all partitions via {@code SHOW PARTITIONS}.
+   *
+   * <p>Note: partition locations are not available from
+   * {@code SHOW PARTITIONS}. The returned {@link Partition} objects
+   * have locations constructed from the table base path.
+   */
+  public List<Partition> getAllPartitions(String tableName, String basePath) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    List<Partition> partitions = new ArrayList<>();
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery("SHOW PARTITIONS " + fqTable)) {
+      while (rs.next()) {
+        String partSpec = rs.getString(1);
+        List<String> values = new ArrayList<>();
+        for (String kv : partSpec.split("/")) {
+          int idx = kv.indexOf("=");
+          values.add(idx >= 0 ? kv.substring(idx + 1) : kv);
+        }
+        String location = basePath + "/" + partSpec;
+        partitions.add(new Partition(values, location));

Review Comment:
   It'd be good to add an example of the query output and parsed partitions in 
the Javadocs.



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCBasedMetadataOperator.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.ddl;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.sync.common.model.FieldSchema;
+import org.apache.hudi.sync.common.model.Partition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides metadata query operations (tableExists, databaseExists,
+ * getTableSchema, etc.) backed by a JDBC connection to HiveServer2.
+ *
+ * <p>This is used as a fallback when the Thrift-based
+ * {@link org.apache.hadoop.hive.metastore.IMetaStoreClient} is
+ * incompatible with the target HMS version (e.g., HMS 4.x changed the
+ * Thrift API from {@code get_table} to {@code get_table_req}).
+ *
+ * <p>All SQL queries use standard HiveQL that is stable across Hive
+ * versions. The operator does not manage the lifecycle of the JDBC
+ * connection — the caller ({@link JDBCExecutor}) owns it.
+ */
+public class JDBCBasedMetadataOperator {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JDBCBasedMetadataOperator.class);
+
+  private final Connection connection;
+  private final String databaseName;
+
+  public JDBCBasedMetadataOperator(Connection connection, String databaseName) 
{
+    this.connection = connection;
+    this.databaseName = databaseName;
+  }
+
+  /**
+   * Checks if a table exists via {@code SHOW TABLES ... LIKE}.
+   */
+  public boolean tableExists(String tableName) {
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW TABLES IN `" + databaseName + "` LIKE '" + tableName + 
"'")) {
+      return rs.next();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to check if table exists via JDBC: " + tableName, e);
+    }
+  }
+
+  /**
+   * Checks if a database exists via {@code SHOW DATABASES LIKE}.
+   */
+  public boolean databaseExists(String dbName) {
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW DATABASES LIKE '" + dbName + "'")) {
+      return rs.next();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to check if database exists via JDBC: " + dbName, e);
+    }
+  }
+
+  /**
+   * Retrieves field schemas via {@code DESCRIBE}.
+   */
+  public List<FieldSchema> getFieldSchemas(String tableName) {
+    List<FieldSchema> fields = new ArrayList<>();
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery("DESCRIBE " + fqTable)) {
+      while (rs.next()) {
+        String colName = rs.getString(1);
+        String colType = rs.getString(2);
+        String comment = rs.getString(3);
+        if (colName != null && !colName.trim().isEmpty()
+            && !colName.startsWith("#")) {
+          fields.add(new FieldSchema(
+              colName.trim(),
+              colType != null ? colType.trim() : "",
+              comment));
+        }
+      }
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to get field schemas via JDBC for: " + tableName, e);
+    }
+    return fields;
+  }
+
+  /**
+   * Retrieves a single table property via {@code SHOW TBLPROPERTIES}.
+   */
+  public Option<String> getTableProperty(String tableName, String key) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW TBLPROPERTIES " + fqTable + " ('" + key + "')")) {
+      if (rs.next()) {
+        String value = rs.getString(2);
+        if (value != null && !value.contains("does not exist")) {
+          return Option.of(value);
+        }
+      }
+      return Option.empty();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to get table property via JDBC: " + key, e);
+    }
+  }
+
+  /**
+   * Sets table properties via {@code ALTER TABLE ... SET TBLPROPERTIES}.
+   */
+  public void setTableProperties(String tableName, Map<String, String> 
properties) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    StringBuilder sb = new StringBuilder("ALTER TABLE ")
+        .append(fqTable).append(" SET TBLPROPERTIES (");
+    boolean first = true;
+    for (Map.Entry<String, String> entry : properties.entrySet()) {
+      if (!first) {
+        sb.append(", ");
+      }
+      sb.append("'").append(entry.getKey()).append("'='")
+          .append(entry.getValue()).append("'");
+      first = false;
+    }
+    sb.append(")");
+    executeSQL(sb.toString());
+  }
+
+  /**
+   * Retrieves the table location from {@code DESCRIBE FORMATTED}.
+   */
+  public String getTableLocation(String tableName) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery("DESCRIBE FORMATTED " + fqTable)) {
+      while (rs.next()) {
+        String col = rs.getString(1);
+        if (col != null && col.trim().equals("Location:")) {
+          return rs.getString(2).trim();
+        }
+      }
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to get table location via JDBC for: " + tableName, e);
+    }
+    throw new HoodieHiveSyncException(
+        "Location not found in DESCRIBE FORMATTED for: " + tableName);
+  }
+
+  /**
+   * Lists all partitions via {@code SHOW PARTITIONS}.
+   *
+   * <p>Note: partition locations are not available from
+   * {@code SHOW PARTITIONS}. The returned {@link Partition} objects
+   * have locations constructed from the table base path.
+   */
+  public List<Partition> getAllPartitions(String tableName, String basePath) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    List<Partition> partitions = new ArrayList<>();
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery("SHOW PARTITIONS " + fqTable)) {
+      while (rs.next()) {
+        String partSpec = rs.getString(1);
+        List<String> values = new ArrayList<>();
+        for (String kv : partSpec.split("/")) {
+          int idx = kv.indexOf("=");
+          values.add(idx >= 0 ? kv.substring(idx + 1) : kv);
+        }
+        String location = basePath + "/" + partSpec;
+        partitions.add(new Partition(values, location));
+      }
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to get partitions via JDBC for: " + tableName, e);
+    }
+    return partitions;
+  }
+
+  /**
+   * Drops a table via {@code DROP TABLE IF EXISTS}.
+   */
+  public void dropTable(String tableName) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    executeSQL("DROP TABLE IF EXISTS " + fqTable);
+    LOG.info("Dropped table via JDBC: {}.{}", databaseName, tableName);
+  }
+
+  /**
+   * Renames a table via {@code ALTER TABLE ... RENAME TO}.
+   */
+  public void renameTable(String oldName, String newName) {
+    String fqOld = "`" + databaseName + "`.`" + oldName + "`";
+    String fqNew = "`" + databaseName + "`.`" + newName + "`";
+    executeSQL("ALTER TABLE " + fqOld + " RENAME TO " + fqNew);
+  }
+
+  /**
+   * Sets the table location and serde path via ALTER TABLE.
+   */
+  public void setTableLocation(String tableName, String location, String 
serdePathKey) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    executeSQL("ALTER TABLE " + fqTable + " SET LOCATION '" + location + "'");
+    if (serdePathKey != null) {
+      executeSQL("ALTER TABLE " + fqTable
+          + " SET SERDEPROPERTIES ('" + serdePathKey + "'='" + location + 
"')");
+    }
+  }
+
+  /**
+   * Sets the storage format and serde properties via ALTER TABLE.
+   */
+  public void setStorageFormat(String tableName, String inputFormat,
+                               String outputFormat, String serdeClass,
+                               Map<String, String> serdeProperties) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    executeSQL("ALTER TABLE " + fqTable + " SET FILEFORMAT INPUTFORMAT '"
+        + inputFormat + "' OUTPUTFORMAT '" + outputFormat
+        + "' SERDE '" + serdeClass + "'");
+    if (serdeProperties != null && !serdeProperties.isEmpty()) {

Review Comment:
   Should the caller guarantee that `serdeProperties` is not null?



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCBasedMetadataOperator.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.ddl;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.sync.common.model.FieldSchema;
+import org.apache.hudi.sync.common.model.Partition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides metadata query operations (tableExists, databaseExists,
+ * getTableSchema, etc.) backed by a JDBC connection to HiveServer2.
+ *
+ * <p>This is used as a fallback when the Thrift-based
+ * {@link org.apache.hadoop.hive.metastore.IMetaStoreClient} is
+ * incompatible with the target HMS version (e.g., HMS 4.x changed the
+ * Thrift API from {@code get_table} to {@code get_table_req}).

Review Comment:
   Do we plan to have an adapter pattern similar to `SparkAdapter` so that for 
HMS 4.x we can have use a compatible `IMetaStoreClient`?



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCBasedMetadataOperator.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.ddl;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.sync.common.model.FieldSchema;
+import org.apache.hudi.sync.common.model.Partition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides metadata query operations (tableExists, databaseExists,
+ * getTableSchema, etc.) backed by a JDBC connection to HiveServer2.
+ *
+ * <p>This is used as a fallback when the Thrift-based
+ * {@link org.apache.hadoop.hive.metastore.IMetaStoreClient} is
+ * incompatible with the target HMS version (e.g., HMS 4.x changed the
+ * Thrift API from {@code get_table} to {@code get_table_req}).
+ *
+ * <p>All SQL queries use standard HiveQL that is stable across Hive
+ * versions. The operator does not manage the lifecycle of the JDBC
+ * connection — the caller ({@link JDBCExecutor}) owns it.
+ */
+public class JDBCBasedMetadataOperator {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JDBCBasedMetadataOperator.class);
+
+  private final Connection connection;
+  private final String databaseName;
+
+  public JDBCBasedMetadataOperator(Connection connection, String databaseName) 
{
+    this.connection = connection;
+    this.databaseName = databaseName;
+  }
+
+  /**
+   * Checks if a table exists via {@code SHOW TABLES ... LIKE}.
+   */
+  public boolean tableExists(String tableName) {
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW TABLES IN `" + databaseName + "` LIKE '" + tableName + 
"'")) {
+      return rs.next();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to check if table exists via JDBC: " + tableName, e);
+    }
+  }
+
+  /**
+   * Checks if a database exists via {@code SHOW DATABASES LIKE}.
+   */
+  public boolean databaseExists(String dbName) {
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW DATABASES LIKE '" + dbName + "'")) {
+      return rs.next();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to check if database exists via JDBC: " + dbName, e);
+    }
+  }
+
+  /**
+   * Retrieves field schemas via {@code DESCRIBE}.
+   */
+  public List<FieldSchema> getFieldSchemas(String tableName) {
+    List<FieldSchema> fields = new ArrayList<>();
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery("DESCRIBE " + fqTable)) {
+      while (rs.next()) {
+        String colName = rs.getString(1);
+        String colType = rs.getString(2);
+        String comment = rs.getString(3);
+        if (colName != null && !colName.trim().isEmpty()
+            && !colName.startsWith("#")) {
+          fields.add(new FieldSchema(
+              colName.trim(),
+              colType != null ? colType.trim() : "",
+              comment));
+        }
+      }
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to get field schemas via JDBC for: " + tableName, e);
+    }
+    return fields;
+  }
+
+  /**
+   * Retrieves a single table property via {@code SHOW TBLPROPERTIES}.
+   */
+  public Option<String> getTableProperty(String tableName, String key) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery(
+             "SHOW TBLPROPERTIES " + fqTable + " ('" + key + "')")) {
+      if (rs.next()) {
+        String value = rs.getString(2);
+        if (value != null && !value.contains("does not exist")) {
+          return Option.of(value);
+        }
+      }
+      return Option.empty();
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to get table property via JDBC: " + key, e);
+    }
+  }
+
+  /**
+   * Sets table properties via {@code ALTER TABLE ... SET TBLPROPERTIES}.
+   */
+  public void setTableProperties(String tableName, Map<String, String> 
properties) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    StringBuilder sb = new StringBuilder("ALTER TABLE ")
+        .append(fqTable).append(" SET TBLPROPERTIES (");
+    boolean first = true;
+    for (Map.Entry<String, String> entry : properties.entrySet()) {
+      if (!first) {
+        sb.append(", ");
+      }
+      sb.append("'").append(entry.getKey()).append("'='")
+          .append(entry.getValue()).append("'");
+      first = false;
+    }
+    sb.append(")");
+    executeSQL(sb.toString());
+  }
+
+  /**
+   * Retrieves the table location from {@code DESCRIBE FORMATTED}.
+   */
+  public String getTableLocation(String tableName) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery("DESCRIBE FORMATTED " + fqTable)) {
+      while (rs.next()) {
+        String col = rs.getString(1);
+        if (col != null && col.trim().equals("Location:")) {
+          return rs.getString(2).trim();
+        }
+      }
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to get table location via JDBC for: " + tableName, e);
+    }
+    throw new HoodieHiveSyncException(
+        "Location not found in DESCRIBE FORMATTED for: " + tableName);
+  }
+
+  /**
+   * Lists all partitions via {@code SHOW PARTITIONS}.
+   *
+   * <p>Note: partition locations are not available from
+   * {@code SHOW PARTITIONS}. The returned {@link Partition} objects
+   * have locations constructed from the table base path.
+   */
+  public List<Partition> getAllPartitions(String tableName, String basePath) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    List<Partition> partitions = new ArrayList<>();
+    try (Statement stmt = connection.createStatement();
+         ResultSet rs = stmt.executeQuery("SHOW PARTITIONS " + fqTable)) {
+      while (rs.next()) {
+        String partSpec = rs.getString(1);
+        List<String> values = new ArrayList<>();
+        for (String kv : partSpec.split("/")) {
+          int idx = kv.indexOf("=");
+          values.add(idx >= 0 ? kv.substring(idx + 1) : kv);
+        }
+        String location = basePath + "/" + partSpec;
+        partitions.add(new Partition(values, location));
+      }
+    } catch (SQLException e) {
+      throw new HoodieHiveSyncException(
+          "Failed to get partitions via JDBC for: " + tableName, e);
+    }
+    return partitions;
+  }
+
+  /**
+   * Drops a table via {@code DROP TABLE IF EXISTS}.
+   */
+  public void dropTable(String tableName) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    executeSQL("DROP TABLE IF EXISTS " + fqTable);
+    LOG.info("Dropped table via JDBC: {}.{}", databaseName, tableName);
+  }
+
+  /**
+   * Renames a table via {@code ALTER TABLE ... RENAME TO}.
+   */
+  public void renameTable(String oldName, String newName) {
+    String fqOld = "`" + databaseName + "`.`" + oldName + "`";
+    String fqNew = "`" + databaseName + "`.`" + newName + "`";
+    executeSQL("ALTER TABLE " + fqOld + " RENAME TO " + fqNew);
+  }
+
+  /**
+   * Sets the table location and serde path via ALTER TABLE.
+   */
+  public void setTableLocation(String tableName, String location, String 
serdePathKey) {
+    String fqTable = "`" + databaseName + "`.`" + tableName + "`";
+    executeSQL("ALTER TABLE " + fqTable + " SET LOCATION '" + location + "'");
+    if (serdePathKey != null) {

Review Comment:
   Use `Option<String> serdePathKey` if it is optional?



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java:
##########
@@ -366,17 +501,33 @@ public void updateLastReplicatedTimeStamp(String 
tableName, String timeStamp) {
       throw new HoodieHiveSyncException(
           "Not a valid completed timestamp " + timeStamp + " for table " + 
tableName);
     }
+    if (useJdbcFallback()) {
+      Map<String, String> props = new HashMap<>();
+      props.put(GLOBALLY_CONSISTENT_READ_TIMESTAMP, timeStamp);
+      jdbcMetadataOperator.setTableProperties(tableName, props);
+      return;
+    }
     try {
       Table table = client.getTable(databaseName, tableName);
       table.putToParameters(GLOBALLY_CONSISTENT_READ_TIMESTAMP, timeStamp);
       client.alter_table(databaseName, tableName, table);
     } catch (Exception e) {
+      if (detectThriftIncompatibility(e)) {
+        Map<String, String> props = new HashMap<>();
+        props.put(GLOBALLY_CONSISTENT_READ_TIMESTAMP, timeStamp);
+        jdbcMetadataOperator.setTableProperties(tableName, props);
+        return;
+      }
       throw new HoodieHiveSyncException(
           "Failed to update last replicated time to " + timeStamp + " for " + 
tableName, e);
     }
   }
 
   public void deleteLastReplicatedTimeStamp(String tableName) {
+    if (useJdbcFallback()) {
+      log.warn("deleteLastReplicatedTimeStamp via JDBC is a no-op for {}", 
tableName);

Review Comment:
   Does JDBC SQL not support table prop deletion?  Does `ALTER TABLE ... UNSET 
TBLPROPERTIES IF EXISTS ('key') ` work?



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java:
##########
@@ -205,29 +277,57 @@ public boolean updateSerdeProperties(String tableName, 
Map<String, String> serde
       client.alter_table(databaseName, tableName, table);
       return true;
     } catch (Exception e) {
+      if (detectThriftIncompatibility(e)) {
+        return updateSerdePropertiesViaJdbc(tableName, serdeProperties, 
useRealtimeFormat);
+      }
       throw new HoodieHiveSyncException("Failed to update table serde info for 
table: " + tableName, e);
     }
   }
 
+  private boolean updateSerdePropertiesViaJdbc(String tableName, Map<String, 
String> serdeProperties, boolean useRealtimeFormat) {
+    serdeProperties.putIfAbsent("serialization.format", "1");
+    HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(
+        config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
+    String inputFormat = getInputFormatClassName(baseFileFormat, 
useRealtimeFormat);
+    String outputFormat = getOutputFormatClassName(baseFileFormat);
+    String serdeClass = getSerDeClassName(baseFileFormat);
+    jdbcMetadataOperator.setStorageFormat(
+        tableName, inputFormat, outputFormat, serdeClass, serdeProperties);
+    return true;
+  }
+
   @Override
   public void updateTableSchema(String tableName, HoodieSchema newSchema, 
SchemaDifference schemaDiff) {
     ddlExecutor.updateTableDefinition(tableName, newSchema);
   }
 
   @Override
   public List<Partition> getAllPartitions(String tableName) {
+    if (useJdbcFallback()) {
+      return jdbcMetadataOperator.getAllPartitions(
+          tableName, config.getString(META_SYNC_BASE_PATH));
+    }
     try {
       return client.listPartitions(databaseName, tableName, (short) -1)
           .stream()
           .map(p -> new Partition(p.getValues(), p.getSd().getLocation()))
           .collect(Collectors.toList());
     } catch (TException e) {
+      if (detectThriftIncompatibility(e)) {
+        return jdbcMetadataOperator.getAllPartitions(
+            tableName, config.getString(META_SYNC_BASE_PATH));
+      }
       throw new HoodieHiveSyncException("Failed to get all partitions for 
table " + tableId(databaseName, tableName), e);
     }
   }
 
   @Override
   public List<Partition> getPartitionsFromList(String tableName, List<String> 
partitions) {
+    if (useJdbcFallback()) {
+      // JDBC does not support partition filtering; return all and let caller 
filter
+      return jdbcMetadataOperator.getAllPartitions(
+          tableName, config.getString(META_SYNC_BASE_PATH));

Review Comment:
   Could the partitions be filtered on the returned results from 
`jdbcMetadataOperator.getAllPartitions`?



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java:
##########
@@ -387,6 +538,10 @@ public void deleteLastReplicatedTimeStamp(String 
tableName) {
     } catch (NoSuchObjectException e) {
       // this is ok the table doesn't even exist.
     } catch (Exception e) {
+      if (detectThriftIncompatibility(e)) {
+        log.warn("deleteLastReplicatedTimeStamp via JDBC is a no-op for {}", 
tableName);

Review Comment:
   Same here



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java:
##########
@@ -142,6 +202,11 @@ public boolean updateTableProperties(String tableName, 
Map<String, String> table
       return false;
     }
 
+    if (useJdbcFallback()) {
+      jdbcMetadataOperator.setTableProperties(tableName, tableProperties);
+      return true;

Review Comment:
   Could the `ALTER TABLE ... SET TBLPROPERTIES` statement returns a result (no 
exception thrown) but the statement fails, so it should return `false` here?



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java:
##########
@@ -79,6 +81,21 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
   DDLExecutor ddlExecutor;
   private IMetaStoreClient client;
 
+  /**
+   * JDBC-based metadata operator, lazily initialized on first Thrift
+   * incompatibility. Only available when sync mode is JDBC.
+   */
+  private JDBCBasedMetadataOperator jdbcMetadataOperator;
+
+  /**
+   * Set to true after the first Thrift API call fails with a
+   * {@link TApplicationException}, indicating the HMS version uses an
+   * incompatible Thrift API (e.g., HMS 4.x renamed {@code get_table}
+   * to {@code get_table_req}). Once set, all subsequent metadata
+   * operations are routed through {@link #jdbcMetadataOperator}.
+   */
+  private volatile boolean thriftIncompatible;

Review Comment:
   Do we need to consider thread safety for this variable?



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java:
##########
@@ -98,19 +115,62 @@ public HoodieHiveSyncClient(HiveSyncConfig config, 
HoodieTableMetaClient metaCli
             ddlExecutor = new HiveQueryDDLExecutor(config, this.client);
             break;
           case JDBC:
-            ddlExecutor = new JDBCExecutor(config);
+            JDBCExecutor jdbcExecutor = new JDBCExecutor(config);
+            ddlExecutor = jdbcExecutor;
+            jdbcMetadataOperator = new JDBCBasedMetadataOperator(
+                jdbcExecutor.getConnection(), databaseName);
             break;
           default:
             throw new HoodieHiveSyncException("Invalid sync mode given " + 
config.getString(HIVE_SYNC_MODE));
         }
       } else {
-        ddlExecutor = config.getBoolean(HIVE_USE_JDBC) ? new 
JDBCExecutor(config) : new HiveQueryDDLExecutor(config, this.client);
+        if (config.getBoolean(HIVE_USE_JDBC)) {
+          JDBCExecutor jdbcExecutor = new JDBCExecutor(config);
+          ddlExecutor = jdbcExecutor;
+          jdbcMetadataOperator = new JDBCBasedMetadataOperator(
+              jdbcExecutor.getConnection(), databaseName);
+        } else {
+          ddlExecutor = new HiveQueryDDLExecutor(config, this.client);
+        }
       }
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to create 
HiveMetaStoreClient", e);
     }
   }
 
+  /**
+   * Returns true if Thrift API was detected as incompatible and JDBC
+   * fallback is available. When true, metadata operations should use
+   * {@link #jdbcMetadataOperator} instead of {@link #client}.
+   */
+  private boolean useJdbcFallback() {
+    return thriftIncompatible && jdbcMetadataOperator != null;
+  }
+
+  /**
+   * Checks if the given exception (or its cause chain) contains a
+   * {@link TApplicationException}, which indicates an incompatible
+   * Thrift API (e.g., HMS 4.x). If detected, switches all subsequent
+   * metadata operations to the JDBC fallback.
+   *
+   * @return true if JDBC fallback is now active
+   */
+  private boolean detectThriftIncompatibility(Exception e) {
+    Throwable cause = e;
+    while (cause != null) {
+      if (cause instanceof TApplicationException) {
+        if (!thriftIncompatible) {
+          log.warn("Thrift API incompatible with HMS, switching to JDBC"
+              + " fallback for metadata operations: {}", cause.getMessage());
+          thriftIncompatible = true;
+        }
+        return jdbcMetadataOperator != null;

Review Comment:
   Could there be a case that `jdbcMetadataOperator` is `null`?



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCBasedMetadataOperator.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.ddl;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.sync.common.model.FieldSchema;
+import org.apache.hudi.sync.common.model.Partition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides metadata query operations (tableExists, databaseExists,
+ * getTableSchema, etc.) backed by a JDBC connection to HiveServer2.
+ *
+ * <p>This is used as a fallback when the Thrift-based
+ * {@link org.apache.hadoop.hive.metastore.IMetaStoreClient} is
+ * incompatible with the target HMS version (e.g., HMS 4.x changed the
+ * Thrift API from {@code get_table} to {@code get_table_req}).
+ *
+ * <p>All SQL queries use standard HiveQL that is stable across Hive
+ * versions. The operator does not manage the lifecycle of the JDBC
+ * connection — the caller ({@link JDBCExecutor}) owns it.
+ */
+public class JDBCBasedMetadataOperator {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JDBCBasedMetadataOperator.class);

Review Comment:
   Use Lombok's `@Slf4j` annotation for creating the logger?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to