This is an automated email from the ASF dual-hosted git repository.

vinish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new f194f4cb [590] Add Hudi HMS Catalog Sync Implementation
f194f4cb is described below

commit f194f4cbc8a54a2cfd49c18b14d656abb046baf2
Author: Vamsi <va...@onehouse.ai>
AuthorDate: Tue Feb 11 20:00:00 2025 +0530

    [590] Add Hudi HMS Catalog Sync Implementation
---
 ...va => HudiCatalogTablePropertiesExtractor.java} |  14 +-
 ...> TestHudiCatalogTablePropertiesExtractor.java} |  10 +-
 xtable-hive-metastore/pom.xml                      |   8 +
 .../org/apache/xtable/hms/HMSCatalogConfig.java    |  11 +
 .../hms/HMSCatalogPartitionSyncOperations.java     | 221 ++++++++++
 .../apache/xtable/hms/HMSCatalogSyncClient.java    |  22 +-
 .../xtable/hms/HMSCatalogTableBuilderFactory.java  |   7 +-
 .../hms/table/HudiHMSCatalogTableBuilder.java      | 204 +++++++++
 ...ntTestBase.java => HMSCatalogSyncTestBase.java} |  10 +-
 .../hms/TestHMSCatalogPartitionSyncOperations.java | 470 +++++++++++++++++++++
 .../xtable/hms/TestHMSCatalogSyncClient.java       |  95 +++--
 .../hms/TestHMSCatalogTableBuilderFactory.java     |   8 +-
 .../hms/table/TestDeltaHMSCatalogTableBuilder.java |   4 +-
 .../hms/table/TestHudiHMSCatalogTableBuilder.java  | 143 +++++++
 .../table/TestIcebergHMSCatalogTableBuilder.java   |   4 +-
 .../apache/xtable/utilities/RunCatalogSync.java    |  25 +-
 .../src/test/resources/catalogConfig.yaml          |   1 +
 17 files changed, 1200 insertions(+), 57 deletions(-)

diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java
similarity index 91%
rename from 
xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java
rename to 
xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java
index e6d99706..ff26e600 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java
@@ -23,6 +23,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
 import org.apache.spark.sql.types.StructType;
 
 import org.apache.hudi.common.util.StringUtils;
@@ -33,15 +36,22 @@ import org.apache.xtable.model.schema.InternalType;
 import org.apache.xtable.schema.SparkSchemaExtractor;
 
 /** Util class to fetch details about Hudi table */
-public class HudiCatalogTableUtils {
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class HudiCatalogTablePropertiesExtractor {
+
+  private static final HudiCatalogTablePropertiesExtractor INSTANCE =
+      new HudiCatalogTablePropertiesExtractor();
 
+  public static HudiCatalogTablePropertiesExtractor getInstance() {
+    return INSTANCE;
+  }
   /**
    * Get Spark Sql related table properties. This is used for spark datasource 
table.
    *
    * @param schema The schema to write to the table.
    * @return A new parameters added the spark's table properties.
    */
-  public static Map<String, String> getSparkTableProperties(
+  public Map<String, String> getSparkTableProperties(
       List<String> partitionNames,
       String sparkVersion,
       int schemaLengthThreshold,
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java
 
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java
similarity index 94%
rename from 
xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java
rename to 
xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java
index e39d63e5..e08674bf 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java
@@ -32,7 +32,7 @@ import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.schema.InternalType;
 
-public class TestHudiCatalogTableUtils {
+public class TestHudiCatalogTablePropertiesExtractor {
 
   @Test
   void testGetSparkTableProperties() {
@@ -81,8 +81,8 @@ public class TestHudiCatalogTableUtils {
             .build();
 
     Map<String, String> result =
-        HudiCatalogTableUtils.getSparkTableProperties(
-            partitionNames, sparkVersion, schemaLengthThreshold, schema);
+        HudiCatalogTablePropertiesExtractor.getInstance()
+            .getSparkTableProperties(partitionNames, sparkVersion, 
schemaLengthThreshold, schema);
 
     // Validate results
     assertEquals("hudi", result.get("spark.sql.sources.provider"));
@@ -127,8 +127,8 @@ public class TestHudiCatalogTableUtils {
 
     // Call the method
     Map<String, String> result =
-        HudiCatalogTableUtils.getSparkTableProperties(
-            partitionNames, "", schemaLengthThreshold, schema);
+        HudiCatalogTablePropertiesExtractor.getInstance()
+            .getSparkTableProperties(partitionNames, "", 
schemaLengthThreshold, schema);
 
     assertEquals("hudi", result.get("spark.sql.sources.provider"));
     assertNull(result.get("spark.sql.create.version"));
diff --git a/xtable-hive-metastore/pom.xml b/xtable-hive-metastore/pom.xml
index 01037a1f..20520f13 100644
--- a/xtable-hive-metastore/pom.xml
+++ b/xtable-hive-metastore/pom.xml
@@ -42,6 +42,14 @@
             <scope>provided</scope>
         </dependency>
 
+        <!-- Hudi dependencies -->
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-hive-sync</artifactId>
+            <version>${hudi.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- Iceberg dependencies  -->
         <dependency>
             <groupId>org.apache.iceberg</groupId>
diff --git 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java
 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java
index f6f9eabd..36fffdd9 100644
--- 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java
+++ 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java
@@ -26,6 +26,8 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
+
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -43,6 +45,15 @@ public class HMSCatalogConfig {
   @JsonProperty("externalCatalog.hms.serverUrl")
   private final String serverUrl;
 
+  @JsonProperty("externalCatalog.hms.schema_string_length_thresh")
+  private final int schemaLengthThreshold = 4000;
+
+  @JsonProperty("externalCatalog.hms.partition_extractor_class")
+  private final String partitionExtractorClass = 
MultiPartKeysValueExtractor.class.getName();
+
+  @JsonProperty("externalCatalog.hms.max_partitions_per_request")
+  private final int maxPartitionsPerRequest = 1000;
+
   protected static HMSCatalogConfig of(Map<String, String> properties) {
     try {
       return OBJECT_MAPPER.readValue(
diff --git 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogPartitionSyncOperations.java
 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogPartitionSyncOperations.java
new file mode 100644
index 00000000..89b53ca5
--- /dev/null
+++ 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogPartitionSyncOperations.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms;
+
+import static 
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.exception.TableNotFoundException;
+
+import org.apache.xtable.catalog.CatalogPartition;
+import org.apache.xtable.catalog.CatalogPartitionSyncOperations;
+import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+
+@Log4j2
+public class HMSCatalogPartitionSyncOperations implements 
CatalogPartitionSyncOperations {
+
+  private final IMetaStoreClient metaStoreClient;
+  private final HMSCatalogConfig catalogConfig;
+
+  public HMSCatalogPartitionSyncOperations(
+      IMetaStoreClient metaStoreClient, HMSCatalogConfig hmsCatalogConfig) {
+    this.metaStoreClient = metaStoreClient;
+    this.catalogConfig = hmsCatalogConfig;
+  }
+
+  @Override
+  public List<CatalogPartition> getAllPartitions(CatalogTableIdentifier 
catalogTableIdentifier) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    try {
+      return metaStoreClient
+          .listPartitions(
+              tableIdentifier.getDatabaseName(), 
tableIdentifier.getTableName(), (short) -1)
+          .stream()
+          .map(p -> new CatalogPartition(p.getValues(), 
p.getSd().getLocation()))
+          .collect(Collectors.toList());
+    } catch (TException e) {
+      throw new CatalogSyncException(
+          "Failed to get all partitions for table " + tableIdentifier, e);
+    }
+  }
+
+  @Override
+  public void addPartitionsToTable(
+      CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> 
partitionsToAdd) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    if (partitionsToAdd.isEmpty()) {
+      log.info("No partitions to add for {}", tableIdentifier);
+      return;
+    }
+    log.info("Adding partitions {} to table {}", partitionsToAdd.size(), 
tableIdentifier);
+    try {
+      StorageDescriptor sd =
+          metaStoreClient
+              .getTable(tableIdentifier.getDatabaseName(), 
tableIdentifier.getTableName())
+              .getSd();
+
+      CollectionUtils.batches(partitionsToAdd, 
catalogConfig.getMaxPartitionsPerRequest())
+          .forEach(
+              batch -> {
+                List<Partition> partitionList = new ArrayList<>();
+                batch.forEach(
+                    partition -> {
+                      partitionList.add(createPartition(tableIdentifier, 
partition, sd));
+                    });
+                try {
+                  metaStoreClient.add_partitions(partitionList, true, false);
+                } catch (TException e) {
+                  log.error("{} add partition failed", tableIdentifier, e);
+                  throw new CatalogSyncException(tableIdentifier + " add 
partition failed", e);
+                }
+                log.info("Add batch partitions done: {}", 
partitionList.size());
+              });
+    } catch (TException e) {
+      log.error("Failed to add partitions to table {}", tableIdentifier, e);
+      throw new CatalogSyncException(tableIdentifier + " add partition 
failed", e);
+    }
+  }
+
+  @Override
+  public void updatePartitionsToTable(
+      CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> 
changedPartitions) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    try {
+      Table table =
+          metaStoreClient.getTable(
+              tableIdentifier.getDatabaseName(), 
tableIdentifier.getTableName());
+      StorageDescriptor tableSd = table.getSd();
+
+      List<Partition> updatedPartitions = new ArrayList<>();
+
+      changedPartitions.forEach(
+          partition -> {
+            updatedPartitions.add(createPartition(tableIdentifier, partition, 
tableSd));
+          });
+
+      // Update partitions
+      metaStoreClient.alter_partitions(
+          tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), 
updatedPartitions);
+    } catch (TException e) {
+      throw new CatalogSyncException(
+          "Failed to update partitions for the table " + tableIdentifier, e);
+    }
+  }
+
+  @Override
+  public void dropPartitions(
+      CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> 
partitionsToDrop) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    try {
+      for (CatalogPartition partition : partitionsToDrop) {
+        metaStoreClient.dropPartition(
+            tableIdentifier.getDatabaseName(),
+            tableIdentifier.getTableName(),
+            partition.getValues(),
+            false);
+      }
+    } catch (TException e) {
+      throw new CatalogSyncException("Failed to drop partitions for table " + 
tableIdentifier, e);
+    }
+  }
+
+  @Override
+  public Map<String, String> getTableProperties(
+      CatalogTableIdentifier catalogTableIdentifier, List<String> 
keysToRetrieve) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    try {
+      Table table =
+          metaStoreClient.getTable(
+              tableIdentifier.getDatabaseName(), 
tableIdentifier.getTableName());
+      Map<String, String> tableParameters = table.getParameters();
+
+      return keysToRetrieve.stream()
+          .filter(tableParameters::containsKey)
+          .collect(Collectors.toMap(key -> key, tableParameters::get));
+    } catch (TableNotFoundException | TException e) {
+      throw new CatalogSyncException(
+          "failed to fetch last time synced properties for table" + 
tableIdentifier, e);
+    }
+  }
+
+  @Override
+  public void updateTableProperties(
+      CatalogTableIdentifier catalogTableIdentifier, Map<String, String> 
propertiesToUpdate) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    try {
+      if (propertiesToUpdate == null || propertiesToUpdate.isEmpty()) {
+        return;
+      }
+
+      Table table =
+          metaStoreClient.getTable(
+              tableIdentifier.getDatabaseName(), 
tableIdentifier.getTableName());
+      Map<String, String> tableParameters = table.getParameters();
+      tableParameters.putAll(propertiesToUpdate);
+      table.setParameters(tableParameters);
+      metaStoreClient.alter_table(
+          tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), 
table);
+    } catch (TableNotFoundException | TException e) {
+      throw new CatalogSyncException(
+          "failed to update last time synced properties for table" + 
tableIdentifier, e);
+    }
+  }
+
+  private Partition createPartition(
+      HierarchicalTableIdentifier tableIdentifier,
+      CatalogPartition partition,
+      StorageDescriptor sd) {
+    StorageDescriptor partitionSd = new StorageDescriptor();
+    partitionSd.setCols(sd.getCols());
+    partitionSd.setInputFormat(sd.getInputFormat());
+    partitionSd.setOutputFormat(sd.getOutputFormat());
+    partitionSd.setSerdeInfo(sd.getSerdeInfo());
+    partitionSd.setLocation(partition.getStorageLocation());
+
+    return new Partition(
+        partition.getValues(),
+        tableIdentifier.getDatabaseName(),
+        tableIdentifier.getTableName(),
+        0,
+        0,
+        partitionSd,
+        null);
+  }
+}
diff --git 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java
 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java
index 537de834..acfdfbd9 100644
--- 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java
+++ 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java
@@ -22,6 +22,7 @@ import static 
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifi
 
 import java.time.ZonedDateTime;
 import java.util.Collections;
+import java.util.Optional;
 
 import lombok.extern.log4j.Log4j2;
 
@@ -36,7 +37,9 @@ import org.apache.thrift.TException;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.xtable.catalog.CatalogPartitionSyncTool;
 import org.apache.xtable.catalog.CatalogTableBuilder;
+import org.apache.xtable.catalog.CatalogUtils;
 import org.apache.xtable.conversion.ExternalCatalogConfig;
 import org.apache.xtable.exception.CatalogSyncException;
 import org.apache.xtable.model.InternalTable;
@@ -55,6 +58,7 @@ public class HMSCatalogSyncClient implements 
CatalogSyncClient<Table> {
   private Configuration configuration;
   private IMetaStoreClient metaStoreClient;
   private CatalogTableBuilder<Table, Table> tableBuilder;
+  private Optional<CatalogPartitionSyncTool> partitionSyncTool;
 
   // For loading the instance using ServiceLoader
   public HMSCatalogSyncClient() {}
@@ -70,12 +74,14 @@ public class HMSCatalogSyncClient implements 
CatalogSyncClient<Table> {
       HMSCatalogConfig hmsCatalogConfig,
       Configuration configuration,
       IMetaStoreClient metaStoreClient,
-      CatalogTableBuilder tableBuilder) {
+      CatalogTableBuilder tableBuilder,
+      Optional<CatalogPartitionSyncTool> partitionSyncTool) {
     this.catalogConfig = catalogConfig;
     this.hmsCatalogConfig = hmsCatalogConfig;
     this.configuration = configuration;
     this.metaStoreClient = metaStoreClient;
     this.tableBuilder = tableBuilder;
+    this.partitionSyncTool = partitionSyncTool;
   }
 
   @Override
@@ -145,6 +151,8 @@ public class HMSCatalogSyncClient implements 
CatalogSyncClient<Table> {
     } catch (TException e) {
       throw new CatalogSyncException("Failed to create table: " + 
tableIdentifier.getId(), e);
     }
+
+    partitionSyncTool.ifPresent(tool -> tool.syncPartitions(table, 
tableIdentifier));
   }
 
   @Override
@@ -158,6 +166,8 @@ public class HMSCatalogSyncClient implements 
CatalogSyncClient<Table> {
     } catch (TException e) {
       throw new CatalogSyncException("Failed to refresh table: " + 
tableIdentifier.getId(), e);
     }
+
+    partitionSyncTool.ifPresent(tool -> tool.syncPartitions(table, 
tableIdentifier));
   }
 
   @Override
@@ -194,7 +204,15 @@ public class HMSCatalogSyncClient implements 
CatalogSyncClient<Table> {
     } catch (MetaException | HiveException e) {
       throw new CatalogSyncException("HiveMetastoreClient could not be 
created", e);
     }
-    this.tableBuilder = HMSCatalogTableBuilderFactory.getInstance(tableFormat, 
this.configuration);
+    this.tableBuilder =
+        HMSCatalogTableBuilderFactory.getTableBuilder(
+            tableFormat, hmsCatalogConfig, this.configuration);
+    this.partitionSyncTool =
+        CatalogUtils.getPartitionSyncTool(
+            tableFormat,
+            hmsCatalogConfig.getPartitionExtractorClass(),
+            new HMSCatalogPartitionSyncOperations(metaStoreClient, 
hmsCatalogConfig),
+            configuration);
   }
 
   /**
diff --git 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java
 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java
index 56c18279..43a77700 100644
--- 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java
+++ 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.xtable.catalog.CatalogTableBuilder;
 import org.apache.xtable.exception.NotSupportedException;
 import org.apache.xtable.hms.table.DeltaHMSCatalogTableBuilder;
+import org.apache.xtable.hms.table.HudiHMSCatalogTableBuilder;
 import org.apache.xtable.hms.table.IcebergHMSCatalogTableBuilder;
 import org.apache.xtable.model.catalog.CatalogTableIdentifier;
 import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
@@ -40,13 +41,15 @@ import org.apache.xtable.model.storage.TableFormat;
 
 public class HMSCatalogTableBuilderFactory {
 
-  static CatalogTableBuilder<Table, Table> getInstance(
-      String tableFormat, Configuration configuration) {
+  public static CatalogTableBuilder<Table, Table> getTableBuilder(
+      String tableFormat, HMSCatalogConfig hmsCatalogConfig, Configuration 
configuration) {
     switch (tableFormat) {
       case TableFormat.ICEBERG:
         return new IcebergHMSCatalogTableBuilder(configuration);
       case TableFormat.DELTA:
         return new DeltaHMSCatalogTableBuilder();
+      case TableFormat.HUDI:
+        return new HudiHMSCatalogTableBuilder(hmsCatalogConfig, configuration);
       default:
         throw new NotSupportedException("Unsupported table format: " + 
tableFormat);
     }
diff --git 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java
 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java
new file mode 100644
index 00000000..cc4dc3df
--- /dev/null
+++ 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms.table;
+
+import static 
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ConfigUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.xtable.catalog.CatalogTableBuilder;
+import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.hms.HMSCatalogConfig;
+import org.apache.xtable.hms.HMSSchemaExtractor;
+import org.apache.xtable.hudi.HudiTableManager;
+import org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor;
+import org.apache.xtable.hudi.catalog.HudiInputFormatUtils;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.TableFormat;
+
+@Log4j2
+public class HudiHMSCatalogTableBuilder implements CatalogTableBuilder<Table, 
Table> {
+
+  private final HudiTableManager hudiTableManager;
+  private final HMSSchemaExtractor schemaExtractor;
+  private final HMSCatalogConfig hmsCatalogConfig;
+  private final HudiCatalogTablePropertiesExtractor tablePropertiesExtractor;
+
+  private HoodieTableMetaClient metaClient;
+
+  protected static final String HUDI_METADATA_CONFIG = 
"hudi.metadata-listing-enabled";
+
+  public HudiHMSCatalogTableBuilder(
+      HMSCatalogConfig hmsCatalogConfig, Configuration configuration) {
+    this.hudiTableManager = HudiTableManager.of(configuration);
+    this.schemaExtractor = HMSSchemaExtractor.getInstance();
+    this.hmsCatalogConfig = hmsCatalogConfig;
+    this.tablePropertiesExtractor = 
HudiCatalogTablePropertiesExtractor.getInstance();
+  }
+
+  @VisibleForTesting
+  HudiHMSCatalogTableBuilder(
+      HMSCatalogConfig hmsCatalogConfig,
+      HMSSchemaExtractor schemaExtractor,
+      HudiTableManager hudiTableManager,
+      HoodieTableMetaClient metaClient,
+      HudiCatalogTablePropertiesExtractor propertiesExtractor) {
+    this.hudiTableManager = hudiTableManager;
+    this.schemaExtractor = schemaExtractor;
+    this.metaClient = metaClient;
+    this.hmsCatalogConfig = hmsCatalogConfig;
+    this.tablePropertiesExtractor = propertiesExtractor;
+  }
+
+  HoodieTableMetaClient getMetaClient(String basePath) {
+    if (metaClient == null) {
+      Optional<HoodieTableMetaClient> metaClientOpt =
+          hudiTableManager.loadTableMetaClientIfExists(basePath);
+
+      if (!metaClientOpt.isPresent()) {
+        throw new CatalogSyncException(
+            "Failed to get meta client since table is not present in the base 
path " + basePath);
+      }
+
+      metaClient = metaClientOpt.get();
+    }
+    return metaClient;
+  }
+
+  @Override
+  public Table getCreateTableRequest(
+      InternalTable table, CatalogTableIdentifier catalogTableIdentifier) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    Table newTb = new Table();
+    newTb.setDbName(tableIdentifier.getDatabaseName());
+    newTb.setTableName(tableIdentifier.getTableName());
+    try {
+      newTb.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
+    } catch (IOException e) {
+      throw new CatalogSyncException(
+          "Failed to set owner for hms table: " + 
tableIdentifier.getTableName(), e);
+    }
+
+    newTb.setCreateTime((int) Instant.now().toEpochMilli());
+    Map<String, String> tableProperties = getTableProperties(table, 
table.getReadSchema());
+    newTb.setParameters(tableProperties);
+    newTb.setSd(getStorageDescriptor(table));
+    newTb.setPartitionKeys(getSchemaPartitionKeys(table));
+    return newTb;
+  }
+
+  @Override
+  public Table getUpdateTableRequest(
+      InternalTable table, Table hmsTable, CatalogTableIdentifier 
tableIdentifier) {
+    Map<String, String> parameters = hmsTable.getParameters();
+    Map<String, String> tableParameters = hmsTable.getParameters();
+    tableParameters.putAll(getTableProperties(table, table.getReadSchema()));
+    hmsTable.setParameters(tableParameters);
+    hmsTable.setSd(getStorageDescriptor(table));
+
+    hmsTable.setParameters(parameters);
+    hmsTable.getSd().setCols(schemaExtractor.toColumns(TableFormat.HUDI, 
table.getReadSchema()));
+    return hmsTable;
+  }
+
+  private Map<String, String> getTableProperties(InternalTable table, 
InternalSchema schema) {
+    List<String> partitionFields =
+        table.getPartitioningFields().stream()
+            .map(field -> field.getSourceField().getName())
+            .collect(Collectors.toList());
+    Map<String, String> tableProperties = new HashMap<>();
+    tableProperties.put(HUDI_METADATA_CONFIG, "true");
+    Map<String, String> sparkTableProperties =
+        tablePropertiesExtractor.getSparkTableProperties(
+            partitionFields, "", hmsCatalogConfig.getSchemaLengthThreshold(), 
schema);
+    tableProperties.putAll(sparkTableProperties);
+    return tableProperties;
+  }
+
+  private StorageDescriptor getStorageDescriptor(InternalTable table) {
+    final StorageDescriptor storageDescriptor = new StorageDescriptor();
+    storageDescriptor.setCols(schemaExtractor.toColumns(TableFormat.HUDI, 
table.getReadSchema()));
+    storageDescriptor.setLocation(table.getBasePath());
+    HoodieFileFormat fileFormat =
+        
getMetaClient(table.getBasePath()).getTableConfig().getBaseFileFormat();
+    String inputFormatClassName = 
HudiInputFormatUtils.getInputFormatClassName(fileFormat, false);
+    String outputFormatClassName = 
HudiInputFormatUtils.getOutputFormatClassName(fileFormat);
+    String serdeClassName = HudiInputFormatUtils.getSerDeClassName(fileFormat);
+    storageDescriptor.setInputFormat(inputFormatClassName);
+    storageDescriptor.setOutputFormat(outputFormatClassName);
+    Map<String, String> serdeProperties = 
getSerdeProperties(table.getBasePath());
+    SerDeInfo serDeInfo = new SerDeInfo();
+    serDeInfo.setSerializationLib(serdeClassName);
+    serDeInfo.setParameters(serdeProperties);
+    storageDescriptor.setSerdeInfo(serDeInfo);
+    return storageDescriptor;
+  }
+
+  private static Map<String, String> getSerdeProperties(String basePath) {
+    Map<String, String> serdeProperties = new HashMap<>();
+    serdeProperties.put(ConfigUtils.TABLE_SERDE_PATH, basePath);
+    serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, 
String.valueOf(false));
+    return serdeProperties;
+  }
+
+  private List<FieldSchema> getSchemaPartitionKeys(InternalTable table) {
+
+    List<InternalPartitionField> partitioningFields = 
table.getPartitioningFields();
+    Map<String, FieldSchema> fieldSchemaMap =
+        schemaExtractor.toColumns(TableFormat.HUDI, 
table.getReadSchema()).stream()
+            .collect(Collectors.toMap(FieldSchema::getName, field -> field));
+
+    return partitioningFields.stream()
+        .map(
+            partitionField -> {
+              if 
(fieldSchemaMap.containsKey(partitionField.getSourceField().getName())) {
+                return 
fieldSchemaMap.get(partitionField.getSourceField().getName());
+              } else {
+                return new 
FieldSchema(partitionField.getSourceField().getName(), "string", "");
+              }
+            })
+        .collect(Collectors.toList());
+  }
+}
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncTestBase.java
similarity index 95%
rename from 
xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java
rename to 
xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncTestBase.java
index cd76c6be..62531e75 100644
--- 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncTestBase.java
@@ -43,7 +43,7 @@ import org.apache.xtable.model.schema.PartitionTransformType;
 import org.apache.xtable.model.storage.CatalogType;
 import org.apache.xtable.model.storage.TableFormat;
 
-public class HMSCatalogSyncClientTestBase {
+public class HMSCatalogSyncTestBase {
 
   @Mock protected IMetaStoreClient mockMetaStoreClient;
   @Mock protected HMSCatalogConfig mockHMSCatalogConfig;
@@ -139,6 +139,14 @@ public class HMSCatalogSyncClientTestBase {
           .readSchema(INTERNAL_SCHEMA)
           .partitioningFields(Collections.singletonList(PARTITION_FIELD))
           .build();
+  protected static final InternalTable TEST_EVOLVED_HUDI_INTERNAL_TABLE =
+      InternalTable.builder()
+          .basePath(TEST_BASE_PATH)
+          .tableFormat(TableFormat.HUDI)
+          .readSchema(UPDATED_INTERNAL_SCHEMA)
+          .partitioningFields(Collections.singletonList(PARTITION_FIELD))
+          .build();
+
   protected static final ThreePartHierarchicalTableIdentifier 
TEST_CATALOG_TABLE_IDENTIFIER =
       new ThreePartHierarchicalTableIdentifier(TEST_HMS_DATABASE, 
TEST_HMS_TABLE);
 
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogPartitionSyncOperations.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogPartitionSyncOperations.java
new file mode 100644
index 00000000..b1422d94
--- /dev/null
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogPartitionSyncOperations.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.SneakyThrows;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import org.apache.xtable.catalog.CatalogPartition;
+import org.apache.xtable.catalog.CatalogPartitionSyncOperations;
+import org.apache.xtable.exception.CatalogSyncException;
+
+@ExtendWith(MockitoExtension.class)
+public class TestHMSCatalogPartitionSyncOperations extends 
HMSCatalogSyncTestBase {
+
+  private CatalogPartitionSyncOperations hmsPartitionSyncOperations;
+
+  void setupCommonMocks() {
+    hmsPartitionSyncOperations =
+        new HMSCatalogPartitionSyncOperations(mockMetaStoreClient, 
mockHMSCatalogConfig);
+  }
+
+  @SneakyThrows
+  @Test
+  void testGetAllPartitions() {
+    setupCommonMocks();
+
+    Partition hivePartition1 = new Partition();
+    hivePartition1.setValues(Collections.singletonList("value1"));
+    StorageDescriptor sd1 = new StorageDescriptor();
+    sd1.setLocation("location1");
+    hivePartition1.setSd(sd1);
+
+    Partition hivePartition2 = new Partition();
+    hivePartition2.setValues(Collections.singletonList("value2"));
+    StorageDescriptor sd2 = new StorageDescriptor();
+    sd2.setLocation("location2");
+    hivePartition2.setSd(sd2);
+
+    List<Partition> hivePartitions = Arrays.asList(hivePartition1, 
hivePartition2);
+    when(mockMetaStoreClient.listPartitions(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
+            (short) -1))
+        .thenReturn(hivePartitions);
+    List<CatalogPartition> partitions =
+        
hmsPartitionSyncOperations.getAllPartitions(TEST_CATALOG_TABLE_IDENTIFIER);
+
+    assertEquals(2, partitions.size());
+    assertEquals("location1", partitions.get(0).getStorageLocation());
+    assertEquals(1, partitions.get(0).getValues().size());
+    assertEquals("value1", partitions.get(0).getValues().get(0));
+    assertEquals("location2", partitions.get(1).getStorageLocation());
+    assertEquals(1, partitions.get(1).getValues().size());
+    assertEquals("value2", partitions.get(1).getValues().get(0));
+
+    verify(mockMetaStoreClient, times(1))
+        .listPartitions(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
+            (short) -1);
+  }
+
+  @Test
+  void testAddPartitionsToTableSuccess() throws Exception {
+    setupCommonMocks();
+    when(mockHMSCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100);
+    CatalogPartition partition1 =
+        new CatalogPartition(Collections.singletonList("value1"), "location1");
+    CatalogPartition partition2 =
+        new CatalogPartition(Collections.singletonList("value2"), "location2");
+    List<CatalogPartition> partitionsToAdd = Arrays.asList(partition1, 
partition2);
+
+    StorageDescriptor tableSd = new StorageDescriptor();
+    tableSd.setCols(Collections.emptyList());
+    tableSd.setInputFormat("inputFormat");
+    tableSd.setOutputFormat("outputFormat");
+    tableSd.setSerdeInfo(new SerDeInfo());
+
+    Table table = new Table();
+    table.setSd(tableSd);
+
+    when(mockMetaStoreClient.getTable(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+        .thenReturn(table);
+
+    // Execute the method
+    
hmsPartitionSyncOperations.addPartitionsToTable(TEST_CATALOG_TABLE_IDENTIFIER, 
partitionsToAdd);
+
+    // Verify behavior
+    ArgumentCaptor<List<Partition>> partitionCaptor = 
ArgumentCaptor.forClass(List.class);
+
+    verify(mockMetaStoreClient, times(1))
+        .add_partitions(partitionCaptor.capture(), eq(true), eq(false));
+
+    // Validate the captured partitions
+    List<Partition> capturedPartitions = partitionCaptor.getValue();
+    assertEquals(2, capturedPartitions.size());
+
+    Partition capturedPartition1 = capturedPartitions.get(0);
+    assertEquals(partition1.getValues(), capturedPartition1.getValues());
+    assertEquals(partition1.getStorageLocation(), 
capturedPartition1.getSd().getLocation());
+
+    Partition capturedPartition2 = capturedPartitions.get(1);
+    assertEquals(partition2.getValues(), capturedPartition2.getValues());
+    assertEquals(partition2.getStorageLocation(), 
capturedPartition2.getSd().getLocation());
+  }
+
+  @Test
+  void testAddPartitionsToTableThrowsException() throws Exception {
+    setupCommonMocks();
+    List<CatalogPartition> partitionsToAdd =
+        Collections.singletonList(
+            new CatalogPartition(Collections.singletonList("value1"), 
"location1"));
+
+    when(mockMetaStoreClient.getTable(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+        .thenThrow(new TException("Test exception"));
+
+    // Execute and validate exception
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () ->
+                hmsPartitionSyncOperations.addPartitionsToTable(
+                    TEST_CATALOG_TABLE_IDENTIFIER, partitionsToAdd));
+
+    assertInstanceOf(TException.class, exception.getCause());
+    verify(mockMetaStoreClient, times(1))
+        .getTable(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName());
+    verify(mockMetaStoreClient, never()).add_partitions(anyList(), 
anyBoolean(), anyBoolean());
+  }
+
+  @Test
+  void testUpdatePartitionsToTableSuccess() throws Exception {
+    setupCommonMocks();
+
+    CatalogPartition changedPartition1 =
+        new CatalogPartition(Collections.singletonList("value1"), "location1");
+    CatalogPartition changedPartition2 =
+        new CatalogPartition(Collections.singletonList("value2"), "location2");
+    List<CatalogPartition> changedPartitions = 
Arrays.asList(changedPartition1, changedPartition2);
+
+    StorageDescriptor tableSd = new StorageDescriptor();
+    tableSd.setCols(Collections.emptyList());
+    tableSd.setInputFormat("inputFormat");
+    tableSd.setOutputFormat("outputFormat");
+    tableSd.setSerdeInfo(new SerDeInfo());
+
+    Table table = new Table();
+    table.setSd(tableSd);
+
+    when(mockMetaStoreClient.getTable(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+        .thenReturn(table);
+
+    // Execute the method
+    hmsPartitionSyncOperations.updatePartitionsToTable(
+        TEST_CATALOG_TABLE_IDENTIFIER, changedPartitions);
+
+    // Capture calls to dropPartition and add_partition
+    ArgumentCaptor<List<Partition>> partitionCaptor = 
ArgumentCaptor.forClass(List.class);
+    verify(mockMetaStoreClient, times(1))
+        .alter_partitions(
+            eq(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()),
+            eq(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()),
+            partitionCaptor.capture());
+    List<Partition> capturedPartitions = partitionCaptor.getValue();
+
+    assertEquals(2, capturedPartitions.size());
+
+    Partition capturedPartition1 = capturedPartitions.get(0);
+    assertEquals(changedPartition1.getValues(), 
capturedPartition1.getValues());
+    assertEquals(changedPartition1.getStorageLocation(), 
capturedPartition1.getSd().getLocation());
+
+    Partition capturedPartition2 = capturedPartitions.get(1);
+    assertEquals(changedPartition2.getValues(), 
capturedPartition2.getValues());
+    assertEquals(changedPartition2.getStorageLocation(), 
capturedPartition2.getSd().getLocation());
+  }
+
+  @Test
+  void testUpdatePartitionsToTableThrowsException() throws Exception {
+    setupCommonMocks();
+    CatalogPartition changedPartition =
+        new CatalogPartition(Collections.singletonList("value1"), "location1");
+
+    when(mockMetaStoreClient.getTable(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+        .thenThrow(new TException("Test exception"));
+
+    // Execute and validate exception
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () ->
+                hmsPartitionSyncOperations.updatePartitionsToTable(
+                    TEST_CATALOG_TABLE_IDENTIFIER, 
Collections.singletonList(changedPartition)));
+
+    assertInstanceOf(TException.class, exception.getCause());
+
+    verify(mockMetaStoreClient, times(1))
+        .getTable(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName());
+    verify(mockMetaStoreClient, never()).alter_partitions(anyString(), 
anyString(), anyList());
+  }
+
+  @Test
+  void testDropPartitionsSuccess() throws Exception {
+    setupCommonMocks();
+
+    CatalogPartition partition1 =
+        new CatalogPartition(Collections.singletonList("value1"), "location1");
+    CatalogPartition partition2 =
+        new CatalogPartition(Collections.singletonList("value2"), "location2");
+    List<CatalogPartition> partitionsToDrop = Arrays.asList(partition1, 
partition2);
+
+    // Execute the method
+    hmsPartitionSyncOperations.dropPartitions(TEST_CATALOG_TABLE_IDENTIFIER, 
partitionsToDrop);
+
+    // Capture calls to dropPartition
+    ArgumentCaptor<List<String>> partitionValuesCaptor = 
ArgumentCaptor.forClass(List.class);
+
+    verify(mockMetaStoreClient, times(2))
+        .dropPartition(
+            eq(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()),
+            eq(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()),
+            partitionValuesCaptor.capture(),
+            eq(false));
+
+    // Validate captured arguments
+    List<List<String>> capturedPartitionValues = 
partitionValuesCaptor.getAllValues();
+    assertEquals(2, capturedPartitionValues.size());
+    assertEquals(partition1.getValues(), capturedPartitionValues.get(0));
+    assertEquals(partition2.getValues(), capturedPartitionValues.get(1));
+  }
+
+  @Test
+  void testDropPartitionsEmptyList() throws Exception {
+    setupCommonMocks();
+    List<CatalogPartition> partitionsToDrop = Collections.emptyList();
+
+    hmsPartitionSyncOperations.dropPartitions(TEST_CATALOG_TABLE_IDENTIFIER, 
partitionsToDrop);
+
+    // Verify no calls to dropPartition
+    verify(mockMetaStoreClient, never())
+        .dropPartition(anyString(), anyString(), anyList(), anyBoolean());
+  }
+
+  @Test
+  void testDropPartitionsThrowsException() throws Exception {
+    setupCommonMocks();
+
+    CatalogPartition partition1 =
+        new CatalogPartition(Collections.singletonList("value1"), "location1");
+    List<CatalogPartition> partitionsToDrop = 
Collections.singletonList(partition1);
+
+    doThrow(new TException("Test exception"))
+        .when(mockMetaStoreClient)
+        .dropPartition(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
+            partition1.getValues(),
+            false);
+
+    // Execute and validate exception
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () ->
+                hmsPartitionSyncOperations.dropPartitions(
+                    TEST_CATALOG_TABLE_IDENTIFIER, partitionsToDrop));
+
+    assertInstanceOf(TException.class, exception.getCause());
+
+    // Verify dropPartition call is made once
+    verify(mockMetaStoreClient, times(1))
+        .dropPartition(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
+            partition1.getValues(),
+            false);
+  }
+
+  @Test
+  void testGetTablePropertiesSuccess() throws Exception {
+    setupCommonMocks();
+
+    List<String> lastSyncedKeys = Arrays.asList("key1", "key2", "key3");
+
+    Map<String, String> mockParameters = new HashMap<>();
+    mockParameters.put("key1", "value1");
+    mockParameters.put("key2", "value2");
+    mockParameters.put("irrelevantKey", "irrelevantKey");
+
+    Table mockTable = new Table();
+    mockTable.setParameters(mockParameters);
+
+    when(mockMetaStoreClient.getTable(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+        .thenReturn(mockTable);
+
+    // Execute the method
+    Map<String, String> result =
+        hmsPartitionSyncOperations.getTableProperties(
+            TEST_CATALOG_TABLE_IDENTIFIER, lastSyncedKeys);
+
+    // Validate the result
+    assertEquals(2, result.size());
+    assertEquals("value1", result.get("key1"));
+    assertEquals("value2", result.get("key2"));
+    assertNull(result.get("key3")); // key3 is not in mockParameters
+  }
+
+  @Test
+  void testGetTablePropertiesThrowsException() throws Exception {
+    setupCommonMocks();
+
+    List<String> lastSyncedKeys = Arrays.asList("key1", "key2");
+
+    when(mockMetaStoreClient.getTable(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+        .thenThrow(new TException("Test exception"));
+
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () ->
+                hmsPartitionSyncOperations.getTableProperties(
+                    TEST_CATALOG_TABLE_IDENTIFIER, lastSyncedKeys));
+
+    assertInstanceOf(TException.class, exception.getCause());
+  }
+
+  @Test
+  void testUpdateTablePropertiesSuccess() throws Exception {
+    setupCommonMocks();
+
+    Map<String, String> lastTimeSyncedProperties = new HashMap<>();
+    lastTimeSyncedProperties.put("last_synced_time", "2023-12-01T12:00:00Z");
+    lastTimeSyncedProperties.put("last_modified_by", "user123");
+
+    Map<String, String> existingParameters = new HashMap<>();
+    existingParameters.put("existing_key", "existing_value");
+
+    Table mockTable = new Table();
+    mockTable.setParameters(existingParameters);
+
+    when(mockMetaStoreClient.getTable(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+        .thenReturn(mockTable);
+
+    // Execute the method
+    hmsPartitionSyncOperations.updateTableProperties(
+        TEST_CATALOG_TABLE_IDENTIFIER, lastTimeSyncedProperties);
+
+    // Verify behavior
+    verify(mockMetaStoreClient, times(1))
+        .alter_table(
+            eq(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()),
+            eq(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()),
+            eq(mockTable));
+
+    // Validate updated parameters
+    Map<String, String> updatedParameters = mockTable.getParameters();
+    assertEquals(3, updatedParameters.size());
+    assertEquals("2023-12-01T12:00:00Z", 
updatedParameters.get("last_synced_time"));
+    assertEquals("user123", updatedParameters.get("last_modified_by"));
+    assertEquals("existing_value", updatedParameters.get("existing_key"));
+  }
+
+  @Test
+  void testUpdateTablePropertiesNoChanges() throws Exception {
+    setupCommonMocks();
+
+    // Empty properties map
+    Map<String, String> lastTimeSyncedProperties = Collections.emptyMap();
+
+    // Execute the method
+    hmsPartitionSyncOperations.updateTableProperties(
+        TEST_CATALOG_TABLE_IDENTIFIER, lastTimeSyncedProperties);
+
+    // Verify no calls to MetaStoreClient
+    verify(mockMetaStoreClient, never()).getTable(anyString(), anyString());
+    verify(mockMetaStoreClient, never()).alter_table(anyString(), anyString(), 
any());
+  }
+
+  @Test
+  void testUpdateTablePropertiesThrowsException() throws Exception {
+    setupCommonMocks();
+
+    Map<String, String> lastTimeSyncedProperties =
+        Collections.singletonMap("last_synced_time", "2023-12-01T12:00:00Z");
+
+    when(mockMetaStoreClient.getTable(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+        .thenThrow(new TException("Test exception"));
+
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () ->
+                hmsPartitionSyncOperations.updateTableProperties(
+                    TEST_CATALOG_TABLE_IDENTIFIER, lastTimeSyncedProperties));
+
+    assertInstanceOf(TException.class, exception.getCause());
+
+    // Verify no alter table calls are made
+    verify(mockMetaStoreClient, times(1))
+        .getTable(
+            TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+            TEST_CATALOG_TABLE_IDENTIFIER.getTableName());
+    verify(mockMetaStoreClient, never()).alter_table(anyString(), anyString(), 
any());
+  }
+}
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java
index fad22ad2..9fae7eb2 100644
--- 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
@@ -37,6 +38,7 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.ServiceLoader;
 
 import lombok.SneakyThrows;
@@ -54,6 +56,7 @@ import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
+import org.apache.xtable.catalog.CatalogPartitionSyncTool;
 import org.apache.xtable.catalog.CatalogTableBuilder;
 import org.apache.xtable.exception.CatalogSyncException;
 import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
@@ -61,29 +64,29 @@ import org.apache.xtable.model.storage.CatalogType;
 import org.apache.xtable.spi.sync.CatalogSyncClient;
 
 @ExtendWith(MockitoExtension.class)
-public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase {
+public class TestHMSCatalogSyncClient extends HMSCatalogSyncTestBase {
 
   @Mock private CatalogTableBuilder<Table, Table> mockTableBuilder;
+  @Mock private CatalogPartitionSyncTool mockPartitionSyncTool;
   private HMSCatalogSyncClient hmsCatalogSyncClient;
 
-  private HMSCatalogSyncClient createHMSCatalogSyncClient() {
+  private HMSCatalogSyncClient createHMSCatalogSyncClient(boolean 
includePartitionSyncTool) {
+    Optional<CatalogPartitionSyncTool> partitionSyncToolOpt =
+        includePartitionSyncTool ? Optional.of(mockPartitionSyncTool) : 
Optional.empty();
     return new HMSCatalogSyncClient(
         TEST_CATALOG_CONFIG,
         mockHMSCatalogConfig,
         testConfiguration,
         mockMetaStoreClient,
-        mockTableBuilder);
-  }
-
-  void setupCommonMocks() {
-    hmsCatalogSyncClient = createHMSCatalogSyncClient();
+        mockTableBuilder,
+        partitionSyncToolOpt);
   }
 
   @SneakyThrows
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   void testHasDatabase(boolean isDbPresent) {
-    setupCommonMocks();
+    hmsCatalogSyncClient = createHMSCatalogSyncClient(false);
     Database db = new Database(TEST_HMS_DATABASE, null, null, 
Collections.emptyMap());
     if (isDbPresent) {
       when(mockMetaStoreClient.getDatabase(TEST_HMS_DATABASE)).thenReturn(db);
@@ -103,7 +106,7 @@ public class TestHMSCatalogSyncClient extends 
HMSCatalogSyncClientTestBase {
   @SneakyThrows
   @Test
   void testHasDatabaseFailure() {
-    setupCommonMocks();
+    hmsCatalogSyncClient = createHMSCatalogSyncClient(false);
     when(mockMetaStoreClient.getDatabase(TEST_HMS_DATABASE))
         .thenThrow(new TException("something went wrong"));
     CatalogSyncException exception =
@@ -119,7 +122,7 @@ public class TestHMSCatalogSyncClient extends 
HMSCatalogSyncClientTestBase {
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   void testGetTable(boolean isTablePresent) {
-    setupCommonMocks();
+    hmsCatalogSyncClient = createHMSCatalogSyncClient(false);
     Table table = newTable(TEST_HMS_DATABASE, TEST_HMS_TABLE);
     if (isTablePresent) {
       when(mockMetaStoreClient.getTable(TEST_HMS_DATABASE, 
TEST_HMS_TABLE)).thenReturn(table);
@@ -139,7 +142,7 @@ public class TestHMSCatalogSyncClient extends 
HMSCatalogSyncClientTestBase {
   @SneakyThrows
   @Test
   void testGetTableFailure() {
-    setupCommonMocks();
+    hmsCatalogSyncClient = createHMSCatalogSyncClient(false);
     when(mockMetaStoreClient.getTable(TEST_HMS_DATABASE, TEST_HMS_TABLE))
         .thenThrow(new TException("something went wrong"));
     CatalogSyncException exception =
@@ -156,7 +159,7 @@ public class TestHMSCatalogSyncClient extends 
HMSCatalogSyncClientTestBase {
   @ParameterizedTest
   @ValueSource(booleans = {false, true})
   void testCreateDatabase(boolean shouldFail) {
-    setupCommonMocks();
+    hmsCatalogSyncClient = createHMSCatalogSyncClient(false);
     Database database = newDatabase(TEST_HMS_DATABASE);
     if (shouldFail) {
       Mockito.doThrow(new TException("something went wrong"))
@@ -179,7 +182,7 @@ public class TestHMSCatalogSyncClient extends 
HMSCatalogSyncClientTestBase {
   @ParameterizedTest
   @ValueSource(booleans = {false, true})
   void testDropTable(boolean shouldFail) {
-    setupCommonMocks();
+    hmsCatalogSyncClient = createHMSCatalogSyncClient(false);
     if (shouldFail) {
       Mockito.doThrow(new TException("something went wrong"))
           .when(mockMetaStoreClient)
@@ -200,9 +203,10 @@ public class TestHMSCatalogSyncClient extends 
HMSCatalogSyncClientTestBase {
   }
 
   @SneakyThrows
-  @Test
-  void testCreateTable_Success() {
-    setupCommonMocks();
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCreateTable_Success(boolean syncPartitions) {
+    hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions);
     Table testTable = new Table();
     when(mockTableBuilder.getCreateTableRequest(
             TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER))
@@ -211,12 +215,20 @@ public class TestHMSCatalogSyncClient extends 
HMSCatalogSyncClientTestBase {
     verify(mockMetaStoreClient, times(1)).createTable(testTable);
     verify(mockTableBuilder, times(1))
         .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER);
+    if (syncPartitions) {
+      verify(mockPartitionSyncTool, times(1))
+          .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+    } else {
+      verify(mockPartitionSyncTool, never())
+          .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+    }
   }
 
   @SneakyThrows
-  @Test
-  void testCreateTable_ErrorGettingTableInput() {
-    setupCommonMocks();
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCreateTable_ErrorGettingTableInput(boolean syncPartitions) {
+    hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions);
 
     // error when getting iceberg table input
     doThrow(new RuntimeException("something went wrong"))
@@ -230,12 +242,15 @@ public class TestHMSCatalogSyncClient extends 
HMSCatalogSyncClientTestBase {
     verify(mockTableBuilder, times(1))
         .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER);
     verify(mockMetaStoreClient, never()).createTable(any());
+    verify(mockPartitionSyncTool, never())
+        .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
   }
 
   @SneakyThrows
-  @Test
-  void testCreateTable_ErrorCreatingTable() {
-    setupCommonMocks();
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCreateTable_ErrorCreatingTable(boolean syncPartitions) {
+    hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions);
 
     // error when creating table
     Table testTable = new Table();
@@ -257,12 +272,15 @@ public class TestHMSCatalogSyncClient extends 
HMSCatalogSyncClientTestBase {
     verify(mockTableBuilder, times(1))
         .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER);
     verify(mockMetaStoreClient, times(1)).createTable(testTable);
+    verify(mockPartitionSyncTool, never())
+        .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
   }
 
   @SneakyThrows
-  @Test
-  void testRefreshTable_Success() {
-    setupCommonMocks();
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testRefreshTable_Success(boolean syncPartitions) {
+    hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions);
     Table origTable = new Table();
     Table updatedTable = new Table(origTable);
     updatedTable.putToParameters(METADATA_LOCATION_PROP, 
ICEBERG_METADATA_FILE_LOCATION_V2);
@@ -276,12 +294,20 @@ public class TestHMSCatalogSyncClient extends 
HMSCatalogSyncClientTestBase {
     verify(mockTableBuilder, times(1))
         .getUpdateTableRequest(
             TEST_ICEBERG_INTERNAL_TABLE, origTable, 
TEST_CATALOG_TABLE_IDENTIFIER);
+    if (syncPartitions) {
+      verify(mockPartitionSyncTool, times(1))
+          .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+    } else {
+      verify(mockPartitionSyncTool, never())
+          .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+    }
   }
 
   @SneakyThrows
-  @Test
-  void testRefreshTable_ErrorGettingUpdatedTable() {
-    setupCommonMocks();
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testRefreshTable_ErrorGettingUpdatedTable(boolean syncPartitions) {
+    hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions);
 
     // error when getting iceberg table input
     Table testTable = new Table();
@@ -298,12 +324,15 @@ public class TestHMSCatalogSyncClient extends 
HMSCatalogSyncClientTestBase {
         .getUpdateTableRequest(
             TEST_ICEBERG_INTERNAL_TABLE, testTable, 
TEST_CATALOG_TABLE_IDENTIFIER);
     verify(mockMetaStoreClient, never()).alter_table(any(), any(), any());
+    verify(mockPartitionSyncTool, never())
+        .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
   }
 
   @SneakyThrows
-  @Test
-  void testRefreshTable_ErrorRefreshingTable() {
-    setupCommonMocks();
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testRefreshTable_ErrorRefreshingTable(boolean syncPartitions) {
+    hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions);
 
     // error when creating table
     Table origTable = new Table();
@@ -329,12 +358,14 @@ public class TestHMSCatalogSyncClient extends 
HMSCatalogSyncClientTestBase {
             TEST_ICEBERG_INTERNAL_TABLE, origTable, 
TEST_CATALOG_TABLE_IDENTIFIER);
     verify(mockMetaStoreClient, times(1))
         .alter_table(TEST_HMS_DATABASE, TEST_HMS_TABLE, updatedTable);
+    verify(mockPartitionSyncTool, never())
+        .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
   }
 
   @SneakyThrows
   @Test
   void testCreateOrReplaceTable() {
-    setupCommonMocks();
+    hmsCatalogSyncClient = createHMSCatalogSyncClient(false);
 
     ZonedDateTime zonedDateTime =
         
Instant.ofEpochMilli(System.currentTimeMillis()).atZone(ZoneId.systemDefault());
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java
index 45d45a2d..31ea33c9 100644
--- 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java
@@ -18,10 +18,10 @@
  
 package org.apache.xtable.hms;
 
-import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.FIELD_SCHEMA;
-import static 
org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_CATALOG_TABLE_IDENTIFIER;
-import static 
org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_HMS_DATABASE;
-import static 
org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_HMS_TABLE;
+import static org.apache.xtable.hms.HMSCatalogSyncTestBase.FIELD_SCHEMA;
+import static 
org.apache.xtable.hms.HMSCatalogSyncTestBase.TEST_CATALOG_TABLE_IDENTIFIER;
+import static org.apache.xtable.hms.HMSCatalogSyncTestBase.TEST_HMS_DATABASE;
+import static org.apache.xtable.hms.HMSCatalogSyncTestBase.TEST_HMS_TABLE;
 import static 
org.apache.xtable.hms.table.TestIcebergHMSCatalogTableBuilder.getTestHmsTableParameters;
 import static 
org.apache.xtable.hms.table.TestIcebergHMSCatalogTableBuilder.getTestHmsTableStorageDescriptor;
 import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java
index 1a7d0cb4..4e799915 100644
--- 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java
@@ -38,11 +38,11 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.MockedStatic;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-import org.apache.xtable.hms.HMSCatalogSyncClientTestBase;
+import org.apache.xtable.hms.HMSCatalogSyncTestBase;
 import org.apache.xtable.model.storage.TableFormat;
 
 @ExtendWith(MockitoExtension.class)
-public class TestDeltaHMSCatalogTableBuilder extends 
HMSCatalogSyncClientTestBase {
+public class TestDeltaHMSCatalogTableBuilder extends HMSCatalogSyncTestBase {
 
   private DeltaHMSCatalogTableBuilder mockDeltaHmsCatalogSyncRequestProvider;
 
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
new file mode 100644
index 00000000..adf88c99
--- /dev/null
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms.table;
+
+import static 
org.apache.xtable.hms.table.HudiHMSCatalogTableBuilder.HUDI_METADATA_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import org.apache.xtable.hms.HMSCatalogSyncTestBase;
+import org.apache.xtable.hms.HMSSchemaExtractor;
+import org.apache.xtable.hudi.HudiTableManager;
+import org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor;
+
+@ExtendWith(MockitoExtension.class)
+public class TestHudiHMSCatalogTableBuilder extends HMSCatalogSyncTestBase {
+
+  @Mock private HoodieTableMetaClient mockMetaClient;
+  @Mock private HudiTableManager mockHudiTableManager;
+  @Mock private HoodieTableConfig mockTableConfig;
+  @Mock private HudiCatalogTablePropertiesExtractor 
mockTablePropertiesExtractor;
+
+  private HudiHMSCatalogTableBuilder hudiHMSCatalogTableBuilder;
+
+  private HudiHMSCatalogTableBuilder 
createMockHudiHMSCatalogSyncRequestProvider() {
+    return new HudiHMSCatalogTableBuilder(
+        mockHMSCatalogConfig,
+        HMSSchemaExtractor.getInstance(),
+        mockHudiTableManager,
+        mockMetaClient,
+        mockTablePropertiesExtractor);
+  }
+
+  void setupCommonMocks() {
+    hudiHMSCatalogTableBuilder = createMockHudiHMSCatalogSyncRequestProvider();
+    when(mockHMSCatalogConfig.getSchemaLengthThreshold()).thenReturn(1000);
+  }
+
+  void setupMetaClientMocks() {
+    
when(mockTableConfig.getBaseFileFormat()).thenReturn(HoodieFileFormat.PARQUET);
+    when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig);
+  }
+
+  @Test
+  void testGetCreateTableInput() {
+    setupCommonMocks();
+    setupMetaClientMocks();
+
+    List<String> partitionFields =
+        TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream()
+            .map(partitionField -> partitionField.getSourceField().getName())
+            .collect(Collectors.toList());
+
+    Table table =
+        hudiHMSCatalogTableBuilder.getCreateTableRequest(
+            TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
+
+    ArgumentCaptor<List<String>> partitionsCaptor = 
ArgumentCaptor.forClass(List.class);
+    verify(mockTablePropertiesExtractor)
+        .getSparkTableProperties(
+            partitionsCaptor.capture(),
+            eq(""),
+            any(Integer.class),
+            eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema()));
+    assertEquals(partitionFields.size(), partitionsCaptor.getValue().size());
+    assertEquals(partitionFields, partitionsCaptor.getValue());
+    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), 
table.getTableName());
+    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), 
table.getDbName());
+    assertEquals(3, table.getSd().getCols().size());
+    assertEquals(1, table.getPartitionKeys().size());
+    assertNotNull(table.getParameters());
+    assertFalse(table.getParameters().isEmpty());
+    assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true");
+  }
+
+  @Test
+  void testGetUpdateTableInput() {
+    setupCommonMocks();
+    setupMetaClientMocks();
+
+    List<String> partitionFields =
+        TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream()
+            .map(partitionField -> partitionField.getSourceField().getName())
+            .collect(Collectors.toList());
+    Table table =
+        hudiHMSCatalogTableBuilder.getCreateTableRequest(
+            TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
+    Table updatedTable =
+        hudiHMSCatalogTableBuilder.getUpdateTableRequest(
+            TEST_EVOLVED_HUDI_INTERNAL_TABLE, table, 
TEST_CATALOG_TABLE_IDENTIFIER);
+
+    ArgumentCaptor<List<String>> partitionsCaptor = 
ArgumentCaptor.forClass(List.class);
+    verify(mockTablePropertiesExtractor)
+        .getSparkTableProperties(
+            partitionsCaptor.capture(),
+            eq(""),
+            any(Integer.class),
+            eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema()));
+    assertEquals(partitionFields.size(), partitionsCaptor.getValue().size());
+    assertEquals(partitionFields, partitionsCaptor.getValue());
+    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), 
updatedTable.getTableName());
+    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), 
updatedTable.getDbName());
+    assertEquals(4, updatedTable.getSd().getCols().size());
+    assertEquals(1, updatedTable.getPartitionKeys().size());
+    assertNotNull(updatedTable.getParameters());
+    assertFalse(table.getParameters().isEmpty());
+    assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true");
+  }
+}
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java
index 14d39c44..b1e8df77 100644
--- 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java
@@ -52,11 +52,11 @@ import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.hadoop.HadoopTables;
 
-import org.apache.xtable.hms.HMSCatalogSyncClientTestBase;
+import org.apache.xtable.hms.HMSCatalogSyncTestBase;
 import org.apache.xtable.hms.HMSSchemaExtractor;
 
 @ExtendWith(MockitoExtension.class)
-public class TestIcebergHMSCatalogTableBuilder extends 
HMSCatalogSyncClientTestBase {
+public class TestIcebergHMSCatalogTableBuilder extends HMSCatalogSyncTestBase {
 
   @Mock private HadoopTables mockIcebergHadoopTables;
   @Mock private BaseTable mockIcebergBaseTable;
diff --git 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
index 349b5ca9..899365e4 100644
--- 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
+++ 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
@@ -64,6 +64,7 @@ import org.apache.xtable.model.catalog.CatalogTableIdentifier;
 import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
 import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
 import org.apache.xtable.model.storage.CatalogType;
+import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.model.sync.SyncMode;
 import org.apache.xtable.reflection.ReflectionUtils;
 import org.apache.xtable.spi.extractor.CatalogConversionSource;
@@ -153,7 +154,9 @@ public class RunCatalogSync {
         TargetTable targetTable =
             TargetTable.builder()
                 .name(sourceTable.getName())
-                .basePath(sourceTable.getBasePath())
+                .basePath(
+                    getSourceTableLocation(
+                        targetCatalogTableIdentifier.getTableFormat(), 
sourceTable))
                 .namespace(sourceTable.getNamespace())
                 .formatName(targetCatalogTableIdentifier.getTableFormat())
                 .additionalProperties(sourceTable.getAdditionalProperties())
@@ -228,15 +231,25 @@ public class RunCatalogSync {
               .additionalProperties(sourceProperties)
               .build();
     } else if (catalogConversionSource.isPresent()) {
+      TableIdentifier tableIdentifier = 
sourceTableIdentifier.getTableIdentifier();
       sourceTable =
-          catalogConversionSource
-              .get()
-              .getSourceTable(
-                  
getCatalogTableIdentifier(sourceTableIdentifier.getTableIdentifier()));
+          
catalogConversionSource.get().getSourceTable(getCatalogTableIdentifier(tableIdentifier));
+      if (tableIdentifier.getPartitionSpec() != null) {
+        sourceTable
+            .getAdditionalProperties()
+            .put(HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, 
tableIdentifier.getPartitionSpec());
+      }
     }
     return sourceTable;
   }
 
+  static String getSourceTableLocation(String targetTableFormat, SourceTable 
sourceTable) {
+    return sourceTable.getFormatName().equals(TableFormat.ICEBERG)
+            && targetTableFormat.equals(TableFormat.HUDI)
+        ? sourceTable.getDataPath()
+        : sourceTable.getBasePath();
+  }
+
   static Map<String, ConversionSourceProvider> getConversionSourceProviders(
       List<String> tableFormats,
       TableFormatConverters tableFormatConverters,
@@ -345,6 +358,8 @@ public class RunCatalogSync {
        * HierarchicalTableIdentifier}
        */
       String hierarchicalId;
+      /** Specifies the partition spec of the table */
+      String partitionSpec;
     }
 
     /**
diff --git a/xtable-utilities/src/test/resources/catalogConfig.yaml 
b/xtable-utilities/src/test/resources/catalogConfig.yaml
index 05b2df4b..88cff2b6 100644
--- a/xtable-utilities/src/test/resources/catalogConfig.yaml
+++ b/xtable-utilities/src/test/resources/catalogConfig.yaml
@@ -45,6 +45,7 @@ datasets:
   - sourceCatalogTableIdentifier:
       tableIdentifier:
         hierarchicalId: "source-database-1.source-1"
+        partitionSpec: "cs_sold_date_sk:VALUE"
     targetCatalogTableIdentifiers:
       - catalogId: "target-1"
         tableFormat: "DELTA"

Reply via email to